Skip to content

Commit 711392c

Browse files
authored
Move serialization back out of SplitShardCountSummary (#136055)
Serialization of this field is determined by the serialization of the request in which it is embedded, so it cannot sensibly serialize itself.
1 parent ff40585 commit 711392c

File tree

2 files changed

+35
-66
lines changed

2 files changed

+35
-66
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.LegacyActionRequest;
@@ -40,6 +41,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
4041

4142
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);
4243

44+
// superseded
45+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
46+
// bumped to use VInt instead of Int
47+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
48+
4349
/**
4450
* Target shard the request should execute on. In case of index and delete requests,
4551
* shard id gets resolved by the transport action before performing request operation
@@ -51,41 +57,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5157
protected String index;
5258

5359
/**
54-
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
55-
* This is populated when the coordinator is deciding which shards a request applies to.
56-
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
57-
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
58-
* based on its cluster state view of the number of shards that are ready for indexing.
59-
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
60-
* node with that visible at the source shard node. (w.r.t resharding).
61-
* When an index is being split, there is a point in time when the newly created shard (target shard)
62-
* takes over its portion of the document space from the original shard (source shard).
63-
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
64-
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
65-
* This field is used by the original shard (source shard) when it processes the request to detect whether
66-
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
67-
* or whether the request must be reprocessed taking into account the current shard states.
68-
*
69-
* Note that we are able to get away with a single number, instead of an array of target shard states,
70-
* because we only allow splits in increments of 2x.
71-
*
72-
* Example 1:
73-
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
74-
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
75-
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
76-
* Requests that are meant for shard 1 and 3 are bundled together,
77-
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
78-
*
79-
* Example 2:
80-
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
81-
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
82-
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
83-
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
84-
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
85-
* are bundled together with and sent to their source shards.
86-
*
87-
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
88-
* will be treated as a Summary mismatch on the source shard node.
60+
* The reshardSplitShardCountSummary has been added to support in-place resharding.
61+
* See {@link SplitShardCountSummary} for details.
8962
*/
9063
protected final SplitShardCountSummary reshardSplitShardCountSummary;
9164

@@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
128101
if (thinRead) {
129102
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
130103
} else {
131-
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
104+
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
105+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
106+
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
107+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
108+
} else {
109+
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
110+
}
132111
}
133112
}
134113

@@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException {
257236
out.writeTimeValue(timeout);
258237
out.writeString(index);
259238
out.writeVLong(routedBasedOnClusterVersion);
260-
reshardSplitShardCountSummary.writeTo(out);
239+
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
240+
out.writeVInt(reshardSplitShardCountSummary.asInt());
241+
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
242+
out.writeInt(reshardSplitShardCountSummary.asInt());
243+
}
261244
}
262245

263246
/**

server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,9 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
import org.elasticsearch.TransportVersion;
1312
import org.elasticsearch.cluster.metadata.IndexMetadata;
1413
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
1514
import org.elasticsearch.cluster.metadata.IndexReshardingState;
16-
import org.elasticsearch.common.io.stream.StreamInput;
17-
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.common.io.stream.Writeable;
19-
20-
import java.io.IOException;
2115

2216
/**
2317
* The SplitShardCountSummary has been added to accommodate in-place index resharding.
@@ -57,12 +51,7 @@
5751
* will be treated as a Summary mismatch on the source shard node.
5852
*/
5953

60-
public class SplitShardCountSummary implements Writeable {
61-
// superseded
62-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
63-
// bumped to use VInt instead of Int
64-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
65-
54+
public class SplitShardCountSummary {
6655
public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0);
6756

6857
/**
@@ -137,16 +126,22 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary(
137126
return new SplitShardCountSummary(shardCount);
138127
}
139128

129+
/**
130+
* Construct a SplitShardCountSummary from an integer
131+
* Used for deserialization.
132+
*/
133+
public static SplitShardCountSummary fromInt(int payload) {
134+
return new SplitShardCountSummary(payload);
135+
}
136+
140137
private final int shardCountSummary;
141138

142-
public SplitShardCountSummary(StreamInput in) throws IOException {
143-
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
144-
this.shardCountSummary = in.readVInt();
145-
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
146-
this.shardCountSummary = in.readInt();
147-
} else {
148-
this.shardCountSummary = UNSET.shardCountSummary;
149-
}
139+
/**
140+
* Return an integer representation of this summary
141+
* Used for serialization.
142+
*/
143+
public int asInt() {
144+
return shardCountSummary;
150145
}
151146

152147
/**
@@ -161,15 +156,6 @@ public boolean isUnset() {
161156
this.shardCountSummary = shardCountSummary;
162157
}
163158

164-
@Override
165-
public void writeTo(StreamOutput out) throws IOException {
166-
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
167-
out.writeVInt(shardCountSummary);
168-
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
169-
out.writeInt(shardCountSummary);
170-
}
171-
}
172-
173159
@Override
174160
public boolean equals(Object other) {
175161
if (this == other) {

0 commit comments

Comments
 (0)