Skip to content

Commit ed80dba

Browse files
authored
Promote the reshard split shard count summary to a type (#135980)
The semantics of this field are subtle and unintuitive enough that I think having it be a bare int is a little dangerous. Using a full type allows us to centralize logic and documentation to make it safer to use.
1 parent f2881e6 commit ed80dba

File tree

9 files changed

+331
-189
lines changed

9 files changed

+331
-189
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.cluster.metadata.ProjectMetadata;
4242
import org.elasticsearch.cluster.project.ProjectResolver;
4343
import org.elasticsearch.cluster.routing.IndexRouting;
44+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4445
import org.elasticsearch.cluster.service.ClusterService;
4546
import org.elasticsearch.common.collect.Iterators;
4647
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -403,9 +404,9 @@ private void executeBulkRequestsByShard(
403404

404405
// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
405406
var indexMetadata = project.index(shardId.getIndexName());
406-
int reshardSplitShardCountSummary = 0;
407+
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
407408
if (indexMetadata != null) {
408-
reshardSplitShardCountSummary = indexMetadata.getReshardSplitShardCountSummaryForIndexing(shardId.getId());
409+
reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
409410
}
410411
BulkShardRequest bulkShardRequest = new BulkShardRequest(
411412
shardId,

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.support.replication.ReplicationRequest;
1919
import org.elasticsearch.action.update.UpdateRequest;
2020
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
21+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2122
import org.elasticsearch.common.io.stream.StreamInput;
2223
import org.elasticsearch.common.io.stream.StreamOutput;
2324
import org.elasticsearch.common.util.set.Sets;
@@ -50,21 +51,26 @@ public BulkShardRequest(StreamInput in) throws IOException {
5051
}
5152
}
5253

53-
public BulkShardRequest(ShardId shardId, int reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
54+
public BulkShardRequest(
55+
ShardId shardId,
56+
SplitShardCountSummary reshardSplitShardCountSummary,
57+
RefreshPolicy refreshPolicy,
58+
BulkItemRequest[] items
59+
) {
5460
this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false);
5561
}
5662

5763
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
58-
this(shardId, 0, refreshPolicy, items, false);
64+
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, false);
5965
}
6066

6167
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
62-
this(shardId, 0, refreshPolicy, items, isSimulated);
68+
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated);
6369
}
6470

6571
public BulkShardRequest(
6672
ShardId shardId,
67-
int reshardSplitShardCountSummary,
73+
SplitShardCountSummary reshardSplitShardCountSummary,
6874
RefreshPolicy refreshPolicy,
6975
BulkItemRequest[] items,
7076
boolean isSimulated

server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.delete.DeleteRequest;
1414
import org.elasticsearch.action.index.IndexRequest;
1515
import org.elasticsearch.action.support.WriteRequest;
16+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
1819
import org.elasticsearch.core.Nullable;
@@ -47,7 +48,7 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
4748
super(shardId);
4849
}
4950

50-
public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
51+
public ReplicatedWriteRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
5152
super(shardId, reshardSplitShardCountSummary);
5253
}
5354

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12-
import org.elasticsearch.TransportVersion;
1312
import org.elasticsearch.action.ActionRequestValidationException;
1413
import org.elasticsearch.action.IndicesRequest;
1514
import org.elasticsearch.action.LegacyActionRequest;
1615
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
1716
import org.elasticsearch.action.index.IndexRequest;
1817
import org.elasticsearch.action.support.ActiveShardCount;
1918
import org.elasticsearch.action.support.IndicesOptions;
19+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2020
import org.elasticsearch.common.io.stream.StreamInput;
2121
import org.elasticsearch.common.io.stream.StreamOutput;
2222
import org.elasticsearch.core.Nullable;
@@ -38,11 +38,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
3838
implements
3939
IndicesRequest {
4040

41-
// superseded
42-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
43-
// bumped to use VInt instead of Int
44-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
45-
4641
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);
4742

4843
/**
@@ -92,7 +87,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
9287
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
9388
* will be treated as a Summary mismatch on the source shard node.
9489
*/
95-
protected final int reshardSplitShardCountSummary;
90+
protected final SplitShardCountSummary reshardSplitShardCountSummary;
9691

9792
/**
9893
* The number of shard copies that must be active before proceeding with the replication action.
@@ -106,10 +101,11 @@ public ReplicationRequest(StreamInput in) throws IOException {
106101
}
107102

108103
public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
109-
this(shardId, 0, in);
104+
this(shardId, SplitShardCountSummary.UNSET, in);
110105
}
111106

112-
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary, StreamInput in) throws IOException {
107+
public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary, StreamInput in)
108+
throws IOException {
113109
super(in);
114110
final boolean thinRead = shardId != null;
115111
if (thinRead) {
@@ -132,27 +128,21 @@ public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountS
132128
if (thinRead) {
133129
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
134130
} else {
135-
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
136-
this.reshardSplitShardCountSummary = in.readVInt();
137-
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
138-
this.reshardSplitShardCountSummary = in.readInt();
139-
} else {
140-
this.reshardSplitShardCountSummary = 0;
141-
}
131+
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
142132
}
143133
}
144134

145135
/**
146136
* Creates a new request with resolved shard id
147137
*/
148138
public ReplicationRequest(@Nullable ShardId shardId) {
149-
this(shardId, 0);
139+
this(shardId, SplitShardCountSummary.UNSET);
150140
}
151141

152142
/**
153143
* Creates a new request with resolved shard id and reshardSplitShardCountSummary
154144
*/
155-
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
145+
public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
156146
this.index = shardId == null ? null : shardId.getIndexName();
157147
this.shardId = shardId;
158148
this.timeout = DEFAULT_TIMEOUT;
@@ -209,7 +199,7 @@ public ShardId shardId() {
209199
* @return The effective shard count as seen by the coordinator when creating this request.
210200
* can be 0 if this has not yet been resolved.
211201
*/
212-
public int reshardSplitShardCountSummary() {
202+
public SplitShardCountSummary reshardSplitShardCountSummary() {
213203
return reshardSplitShardCountSummary;
214204
}
215205

@@ -267,11 +257,7 @@ public void writeTo(StreamOutput out) throws IOException {
267257
out.writeTimeValue(timeout);
268258
out.writeString(index);
269259
out.writeVLong(routedBasedOnClusterVersion);
270-
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
271-
out.writeVInt(reshardSplitShardCountSummary);
272-
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
273-
out.writeInt(reshardSplitShardCountSummary);
274-
}
260+
reshardSplitShardCountSummary.writeTo(out);
275261
}
276262

277263
/**

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.node.DiscoveryNode;
3434
import org.elasticsearch.cluster.routing.AllocationId;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
36+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
3637
import org.elasticsearch.cluster.service.ClusterService;
3738
import org.elasticsearch.common.io.stream.StreamInput;
3839
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -469,11 +470,11 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
469470
throw blockException;
470471
}
471472

472-
int reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
473-
assert (reshardSplitShardCountSummary == 0
474-
|| reshardSplitShardCountSummary == indexMetadata.getReshardSplitShardCountSummaryForIndexing(
475-
primaryRequest.getRequest().shardId().getId()
476-
));
473+
SplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
474+
assert reshardSplitShardCountSummary.isUnset()
475+
|| reshardSplitShardCountSummary.equals(
476+
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
477+
);
477478
if (primaryShardReference.isRelocated()) {
478479
primaryShardReference.close(); // release shard operation lock as soon as possible
479480
setPhase(replicationTask, "primary_delegation");

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,71 +1196,6 @@ public int getNumberOfShards() {
11961196
return numberOfShards;
11971197
}
11981198

1199-
/**
1200-
* This method is used in the context of the resharding feature.
1201-
* Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for
1202-
* an operation to be routed to target shards,
1203-
* this method returns the "effective" shard count as seen by this IndexMetadata.
1204-
*
1205-
* The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or
1206-
* to both source and target shards. Requests are routed to both source and target shards
1207-
* once the target shards are ready for an operation.
1208-
*
1209-
* The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards
1210-
* undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to
1211-
* source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard
1212-
* to verify if the coordinator and source node's view of the resharding state have a mismatch.
1213-
* See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary}
1214-
* for a detailed description of how this value is used.
1215-
*
1216-
* @param shardId Input shardId for which we want to calculate the effective shard count
1217-
* @param minShardState Minimum target shard state required for the target to be considered ready
1218-
* @return Effective shard count as seen by an operation using this IndexMetadata
1219-
*/
1220-
private int getReshardSplitShardCountSummary(int shardId, IndexReshardingState.Split.TargetShardState minShardState) {
1221-
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
1222-
int shardCount = getNumberOfShards();
1223-
if (reshardingMetadata != null) {
1224-
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
1225-
int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId);
1226-
// Requests cannot be routed to target shards until they are ready
1227-
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state";
1228-
shardCount = reshardingMetadata.getSplit().shardCountAfter();
1229-
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) {
1230-
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) {
1231-
shardCount = reshardingMetadata.getSplit().shardCountAfter();
1232-
} else {
1233-
shardCount = reshardingMetadata.getSplit().shardCountBefore();
1234-
}
1235-
}
1236-
}
1237-
return shardCount;
1238-
}
1239-
1240-
/**
1241-
* This method is used in the context of the resharding feature.
1242-
* Given a {@code shardId}, this method returns the "effective" shard count
1243-
* as seen by this IndexMetadata, for indexing operations.
1244-
*
1245-
* See {@code getReshardSplitShardCountSummary} for more details.
1246-
* @param shardId Input shardId for which we want to calculate the effective shard count
1247-
*/
1248-
public int getReshardSplitShardCountSummaryForIndexing(int shardId) {
1249-
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF));
1250-
}
1251-
1252-
/**
1253-
* This method is used in the context of the resharding feature.
1254-
* Given a {@code shardId}, this method returns the "effective" shard count
1255-
* as seen by this IndexMetadata, for search operations.
1256-
*
1257-
* See {@code getReshardSplitShardCount} for more details.
1258-
* @param shardId Input shardId for which we want to calculate the effective shard count
1259-
*/
1260-
public int getReshardSplitShardCountSummaryForSearch(int shardId) {
1261-
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT));
1262-
}
1263-
12641199
public int getNumberOfReplicas() {
12651200
return numberOfReplicas;
12661201
}

0 commit comments

Comments
 (0)