Skip to content

Commit 08b16eb

Browse files
authored
Introduce IndexReshardingMetadata (#121360)
* Introduce IndexReshardingMetadata This adds to IndexMetadata the persistent state we will need to track while a split is in progress. Nothing outside test code sets it yet, so it doesn't introduce any wire changes yet. Followups will consult this to make routing decisions and handle backward compatibility if the object is present in metadata.
1 parent 2761af0 commit 08b16eb

File tree

6 files changed

+801
-8
lines changed

6 files changed

+801
-8
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,8 @@ public Iterator<Setting<?>> settings() {
561561

562562
public static final String KEY_INFERENCE_FIELDS = "field_inference";
563563

564+
public static final String KEY_RESHARDING = "resharding";
565+
564566
public static final String INDEX_STATE_FILE_PREFIX = "state-";
565567

566568
static final TransportVersion STATS_AND_FORECAST_ADDED = TransportVersions.V_8_6_0;
@@ -654,6 +656,8 @@ public Iterator<Setting<?>> settings() {
654656
private final Double writeLoadForecast;
655657
@Nullable
656658
private final Long shardSizeInBytesForecast;
659+
@Nullable
660+
private final IndexReshardingMetadata reshardingMetadata;
657661

658662
private IndexMetadata(
659663
final Index index,
@@ -702,7 +706,8 @@ private IndexMetadata(
702706
final IndexVersion indexCompatibilityVersion,
703707
@Nullable final IndexMetadataStats stats,
704708
@Nullable final Double writeLoadForecast,
705-
@Nullable Long shardSizeInBytesForecast
709+
@Nullable Long shardSizeInBytesForecast,
710+
@Nullable IndexReshardingMetadata reshardingMetadata
706711
) {
707712
this.index = index;
708713
this.version = version;
@@ -761,6 +766,7 @@ private IndexMetadata(
761766
this.writeLoadForecast = writeLoadForecast;
762767
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
763768
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
769+
this.reshardingMetadata = reshardingMetadata;
764770
}
765771

766772
IndexMetadata withMappingMetadata(MappingMetadata mapping) {
@@ -814,7 +820,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
814820
this.indexCompatibilityVersion,
815821
this.stats,
816822
this.writeLoadForecast,
817-
this.shardSizeInBytesForecast
823+
this.shardSizeInBytesForecast,
824+
this.reshardingMetadata
818825
);
819826
}
820827

@@ -875,7 +882,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
875882
this.indexCompatibilityVersion,
876883
this.stats,
877884
this.writeLoadForecast,
878-
this.shardSizeInBytesForecast
885+
this.shardSizeInBytesForecast,
886+
this.reshardingMetadata
879887
);
880888
}
881889

@@ -934,7 +942,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
934942
this.indexCompatibilityVersion,
935943
this.stats,
936944
this.writeLoadForecast,
937-
this.shardSizeInBytesForecast
945+
this.shardSizeInBytesForecast,
946+
this.reshardingMetadata
938947
);
939948
}
940949

@@ -994,7 +1003,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind
9941003
this.indexCompatibilityVersion,
9951004
this.stats,
9961005
this.writeLoadForecast,
997-
this.shardSizeInBytesForecast
1006+
this.shardSizeInBytesForecast,
1007+
this.reshardingMetadata
9981008
);
9991009
}
10001010

@@ -1049,7 +1059,8 @@ public IndexMetadata withIncrementedVersion() {
10491059
this.indexCompatibilityVersion,
10501060
this.stats,
10511061
this.writeLoadForecast,
1052-
this.shardSizeInBytesForecast
1062+
this.shardSizeInBytesForecast,
1063+
this.reshardingMetadata
10531064
);
10541065
}
10551066

@@ -1900,6 +1911,7 @@ public static class Builder {
19001911
private IndexMetadataStats stats = null;
19011912
private Double indexWriteLoadForecast = null;
19021913
private Long shardSizeInBytesForecast = null;
1914+
private IndexReshardingMetadata reshardingMetadata = null;
19031915

19041916
public Builder(String index) {
19051917
this.index = index;
@@ -1935,6 +1947,7 @@ public Builder(IndexMetadata indexMetadata) {
19351947
this.stats = indexMetadata.stats;
19361948
this.indexWriteLoadForecast = indexMetadata.writeLoadForecast;
19371949
this.shardSizeInBytesForecast = indexMetadata.shardSizeInBytesForecast;
1950+
this.reshardingMetadata = indexMetadata.reshardingMetadata;
19381951
}
19391952

19401953
public Builder index(String index) {
@@ -2224,6 +2237,11 @@ public Builder putInferenceFields(Map<String, InferenceFieldMetadata> values) {
22242237
return this;
22252238
}
22262239

2240+
public Builder reshardingMetadata(IndexReshardingMetadata reshardingMetadata) {
2241+
this.reshardingMetadata = reshardingMetadata;
2242+
return this;
2243+
}
2244+
22272245
public IndexMetadata build() {
22282246
return build(false);
22292247
}
@@ -2423,7 +2441,8 @@ IndexMetadata build(boolean repair) {
24232441
SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
24242442
stats,
24252443
indexWriteLoadForecast,
2426-
shardSizeInBytesForecast
2444+
shardSizeInBytesForecast,
2445+
reshardingMetadata
24272446
);
24282447
}
24292448

@@ -2563,6 +2582,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
25632582
builder.endObject();
25642583
}
25652584

2585+
if (indexMetadata.reshardingMetadata != null) {
2586+
builder.startObject(KEY_RESHARDING);
2587+
indexMetadata.reshardingMetadata.toXContent(builder, params);
2588+
builder.endObject();
2589+
}
2590+
25662591
builder.endObject();
25672592
}
25682593

@@ -2649,6 +2674,9 @@ public static IndexMetadata fromXContent(XContentParser parser, Map<String, Mapp
26492674
builder.putInferenceField(InferenceFieldMetadata.fromXContent(parser));
26502675
}
26512676
break;
2677+
case KEY_RESHARDING:
2678+
builder.reshardingMetadata(IndexReshardingMetadata.fromXContent(parser));
2679+
break;
26522680
default:
26532681
// assume it's custom index metadata
26542682
builder.putCustom(currentFieldName, parser.mapStrings());
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.metadata;
11+
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.xcontent.ConstructingObjectParser;
16+
import org.elasticsearch.xcontent.ParseField;
17+
import org.elasticsearch.xcontent.ToXContentFragment;
18+
import org.elasticsearch.xcontent.XContentBuilder;
19+
import org.elasticsearch.xcontent.XContentParser;
20+
21+
import java.io.IOException;
22+
import java.util.Objects;
23+
24+
/**
25+
* IndexReshardingMetadata holds persistent state managing an in-flight index resharding operation
26+
*
27+
* Resharding is changing the number of shards that make up an index, in place.
28+
* We currently only support splitting an index into an integer multiple of its current shard count,
29+
* e.g., going from 1 to 3 shards, or 2 to 4. This is because we route documents to shards by hash of
30+
* the document id modulo the shard count. Multiplying the shard count under this scheme lets us move
31+
* only the fraction of the documents that route to new shards while the rest stay where they were.
32+
*
33+
* During a split, we create new shards and then migrate the documents that belong to the new shards
34+
* according to the routing function to those new shards. While we're moving documents, search requests
35+
* may be ongoing, or new documents may be indexed. There must not be ambiguity about whether the source
36+
* shard or the target shards are responsible for documents being indexed or searched while this handoff
37+
* is occurring, to ensure that we don't lose or double-count documents during the process. We prevent this
38+
* by maintaining the state of the split on the source and target shards, and making an atomic (from the point
39+
* of view of indexing and search requests) transition from having the source shard handle requests for documents
40+
* that belong to the target shard, to having the target shard handle them itself.
41+
*
42+
* Before the handoff, the source shard has the entire document collection for both the source and target, and handles
43+
* indexing and search requests. After the handoff, documents that route to the target are handled by the target,
44+
* and the source does not necessarily have a complete view - it will be missing any documents that are indexed
45+
* to the target shard after handoff. Indeed, when the target becomes active, the source filters target documents
46+
* from its search results, so that they are not counted twice when the target shard is also searched. The handoff
47+
* is performed at the target by queueing incoming requests prior to entering handoff, waiting for the target to
48+
* be RUNNING, and then forwarding requests for the target shard to the target. Similarly, when the target first
49+
* becomes active it must filter out search results containing documents owned by the source shard, which may be
50+
* present if the target was created by copying the source shard's Lucene files.
51+
*
52+
* To ensure that we always route requests to the correct shard, even in the case of failure of either source or
53+
* target shards during split, we preserve the transition point in persistent state until the split is complete, so
54+
* that when the source or target recovers, it can resync and route correctly based on that state. This class holds
55+
* the persistent state required to recover correctly, always maintaining the invariant that only the source shard
56+
* accepts indexing and search requests for the target prior to handoff, and only the target shard accepts them afterward.
57+
*
58+
* The state we preserve is:
59+
* * The old and new shard counts for a resize operation, so that we can always identify which shards are sources
60+
* and which are targets during resharding. For example, old:2 new:6 implies that shard 1 is the source shard for
61+
* shards 3 and 5, and shard 2 is the source for shards 4 and 6.
62+
* * For each source shard, its current source state, which is either `SOURCE` or `DONE`.
63+
* - If a source shard may still contain data for any target shard then it is in state `SOURCE`.
64+
* - When all targets for a source have moved to `SPLIT` (see below), then the source deletes all documents from
65+
* its store that are now the responsibility of the target shards and transitions to `DONE`.
66+
* This isn't strictly required to be persistent for correctness, but it can save time on recovery
67+
* by allowing a DONE shard to skip interrogating targets and repeating cleanup.
68+
* * For each target shard, its current target state, which is one of `CLONE`, `HANDOFF`, `SPLIT`, or `DONE`.
69+
* - If the target has not yet copied all data from the source shard, then it is in `CLONE`.
70+
* - It moves to `HANDOFF` when it has copied all of its data from the source to indicate that it is now ready to
71+
* receive indexing actions, and starts RUNNING. After this point, the source may no longer contain the entire contents
72+
* of the target and must not index documents belonging to the target. But since search shards can't start up until
73+
* their corresponding index shards are active, search requests would fail if they routed to the target shard immediately
74+
* after handoff. So at HANDOFF, the source shards continue to service searches, but block refresh since they cannot
75+
* be guaranteed to have seen documents indexed after HANDOFF.
76+
* - When the target shard's corresponding search replica has started running, the target requests that the source filter
77+
* search results belonging to the target, and moves the target shard's state to `SPLIT`. The target's search replica
78+
* likewise filters documents not belonging to the target, which may be present due to the target bootstrapping by copying
79+
* the source's lucene files.
80+
* - Upon entering `SPLIT`, the target starts deleting all documents from its lucene store that do not belong to it. When that
81+
* is complete, it moves to `DONE` and removes filters for other shards, which are no longer necessary.
82+
*
83+
* Note that each target shard's split operates independently and all may happen concurrently.
84+
*
85+
* When all source shards have transitioned to `DONE`, the resize is complete and this metadata may be removed from cluster state.
86+
* We only allow at most a single resharding operation to be in flight for an index, so removing this metadata is a prerequisite
87+
* to beginning another resharding operation.
88+
*/
89+
public class IndexReshardingMetadata implements ToXContentFragment, Writeable {
90+
private static final String SPLIT_FIELD_NAME = "split";
91+
private static final ParseField SPLIT_FIELD = new ParseField(SPLIT_FIELD_NAME);
92+
// This exists only so that tests can verify that IndexReshardingMetadata supports more than one kind of operation.
93+
// It can be removed when we have defined a second real operation, such as shrink.
94+
private static final String NOOP_FIELD_NAME = "noop";
95+
private static final ParseField NOOP_FIELD = new ParseField(NOOP_FIELD_NAME);
96+
97+
private static final ConstructingObjectParser<IndexReshardingMetadata, Void> PARSER = new ConstructingObjectParser<>(
98+
"index_resharding_metadata",
99+
args -> {
100+
// the parser ensures exactly one argument will not be null
101+
if (args[0] != null) {
102+
return new IndexReshardingMetadata((IndexReshardingState) args[0]);
103+
} else {
104+
return new IndexReshardingMetadata((IndexReshardingState) args[1]);
105+
}
106+
}
107+
);
108+
109+
static {
110+
PARSER.declareObjectOrNull(
111+
ConstructingObjectParser.optionalConstructorArg(),
112+
(parser, c) -> IndexReshardingState.Split.fromXContent(parser),
113+
null,
114+
SPLIT_FIELD
115+
);
116+
PARSER.declareObjectOrNull(
117+
ConstructingObjectParser.optionalConstructorArg(),
118+
(parser, c) -> IndexReshardingState.Noop.fromXContent(parser),
119+
null,
120+
NOOP_FIELD
121+
);
122+
PARSER.declareExclusiveFieldSet(SPLIT_FIELD.getPreferredName(), NOOP_FIELD.getPreferredName());
123+
PARSER.declareRequiredFieldSet(SPLIT_FIELD.getPreferredName(), NOOP_FIELD.getPreferredName());
124+
}
125+
126+
private final IndexReshardingState state;
127+
128+
// visible for testing
129+
IndexReshardingMetadata(IndexReshardingState state) {
130+
this.state = state;
131+
}
132+
133+
public IndexReshardingMetadata(StreamInput in) throws IOException {
134+
var stateName = in.readString();
135+
136+
state = switch (stateName) {
137+
case NOOP_FIELD_NAME -> new IndexReshardingState.Noop(in);
138+
case SPLIT_FIELD_NAME -> new IndexReshardingState.Split(in);
139+
default -> throw new IllegalStateException("unknown operation [" + stateName + "]");
140+
};
141+
}
142+
143+
// for testing
144+
IndexReshardingState getState() {
145+
return state;
146+
}
147+
148+
static IndexReshardingMetadata fromXContent(XContentParser parser) throws IOException {
149+
return PARSER.parse(parser, null);
150+
}
151+
152+
@Override
153+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
154+
String name = switch (state) {
155+
case IndexReshardingState.Noop ignored -> NOOP_FIELD.getPreferredName();
156+
case IndexReshardingState.Split ignored -> SPLIT_FIELD.getPreferredName();
157+
};
158+
builder.startObject(name);
159+
state.toXContent(builder, params);
160+
builder.endObject();
161+
162+
return builder;
163+
}
164+
165+
@Override
166+
public void writeTo(StreamOutput out) throws IOException {
167+
String name = switch (state) {
168+
case IndexReshardingState.Noop ignored -> NOOP_FIELD.getPreferredName();
169+
case IndexReshardingState.Split ignored -> SPLIT_FIELD.getPreferredName();
170+
};
171+
out.writeString(name);
172+
state.writeTo(out);
173+
}
174+
175+
@Override
176+
public boolean equals(Object other) {
177+
if (this == other) {
178+
return true;
179+
}
180+
if (other == null || getClass() != other.getClass()) {
181+
return false;
182+
}
183+
IndexReshardingMetadata otherMetadata = (IndexReshardingMetadata) other;
184+
185+
return Objects.equals(state, otherMetadata.state);
186+
}
187+
188+
@Override
189+
public int hashCode() {
190+
return Objects.hash(state);
191+
}
192+
193+
public String toString() {
194+
return "IndexReshardingMetadata [state=" + state + "]";
195+
}
196+
197+
/**
198+
* Create resharding metadata representing a new split operation
199+
* Split only supports updating an index to a multiple of its current shard count
200+
* @param shardCount the number of shards in the index at the start of the operation
201+
* @param multiple the new shard count is shardCount * multiple
202+
* @return resharding metadata representing the start of the requested split
203+
*/
204+
public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int multiple) {
205+
return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple));
206+
}
207+
208+
public IndexReshardingState.Split getSplit() {
209+
return switch (state) {
210+
case IndexReshardingState.Noop ignored -> throw new IllegalArgumentException("resharding metadata is not a split");
211+
case IndexReshardingState.Split s -> s;
212+
};
213+
}
214+
}

0 commit comments

Comments
 (0)