Skip to content

Commit 15b93c1

Browse files
committed
Move serialization back out of SplitShardCountSummary
Serialization of this field is determined by the serialization of the request in which it is embedded, so it cannot sensibly serialize itself.
1 parent 2b91dd5 commit 15b93c1

File tree

2 files changed

+36
-68
lines changed

2 files changed

+36
-68
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.LegacyActionRequest;
@@ -40,6 +41,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
4041

4142
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);
4243

44+
// superseded
45+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
46+
// bumped to use VInt instead of Int
47+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
48+
4349
/**
4450
* Target shard the request should execute on. In case of index and delete requests,
4551
* shard id gets resolved by the transport action before performing request operation
@@ -51,41 +57,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5157
protected String index;
5258

5359
/**
54-
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
55-
* This is populated when the coordinator is deciding which shards a request applies to.
56-
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
57-
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
58-
* based on its cluster state view of the number of shards that are ready for indexing.
59-
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
60-
* node with that visible at the source shard node. (w.r.t resharding).
61-
* When an index is being split, there is a point in time when the newly created shard (target shard)
62-
* takes over its portion of the document space from the original shard (source shard).
63-
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
64-
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
65-
* This field is used by the original shard (source shard) when it processes the request to detect whether
66-
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
67-
* or whether the request must be reprocessed taking into account the current shard states.
68-
*
69-
* Note that we are able to get away with a single number, instead of an array of target shard states,
70-
* because we only allow splits in increments of 2x.
71-
*
72-
* Example 1:
73-
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
74-
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
75-
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
76-
* Requests that are meant for shard 1 and 3 are bundled together,
77-
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
78-
*
79-
* Example 2:
80-
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
81-
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
82-
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
83-
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
84-
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
85-
* are bundled together with and sent to their source shards.
86-
*
87-
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
88-
* will be treated as a Summary mismatch on the source shard node.
60+
* The reshardSplitShardCountSummary has been added to support in-place resharding.
61+
* See {@link SplitShardCountSummary} for details.
8962
*/
9063
protected final SplitShardCountSummary reshardSplitShardCountSummary;
9164

@@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
128101
if (thinRead) {
129102
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
130103
} else {
131-
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
104+
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
105+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
106+
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
107+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
108+
} else {
109+
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
110+
}
132111
}
133112
}
134113

@@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException {
257236
out.writeTimeValue(timeout);
258237
out.writeString(index);
259238
out.writeVLong(routedBasedOnClusterVersion);
260-
reshardSplitShardCountSummary.writeTo(out);
239+
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
240+
out.writeVInt(reshardSplitShardCountSummary.asInt());
241+
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
242+
out.writeInt(reshardSplitShardCountSummary.asInt());
243+
}
261244
}
262245

263246
/**

server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,9 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
import org.elasticsearch.TransportVersion;
1312
import org.elasticsearch.cluster.metadata.IndexMetadata;
1413
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
1514
import org.elasticsearch.cluster.metadata.IndexReshardingState;
16-
import org.elasticsearch.common.io.stream.StreamInput;
17-
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.common.io.stream.Writeable;
19-
20-
import java.io.IOException;
2115

2216
/**
2317
* The SplitShardCountSummary has been added to accommodate in-place index resharding.
@@ -57,12 +51,7 @@
5751
* will be treated as a Summary mismatch on the source shard node.
5852
*/
5953

60-
public class SplitShardCountSummary implements Writeable {
61-
// superseded
62-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
63-
// bumped to use VInt instead of Int
64-
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
65-
54+
public class SplitShardCountSummary {
6655
public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0);
6756

6857
/**
@@ -137,39 +126,35 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary(
137126
return new SplitShardCountSummary(shardCount);
138127
}
139128

140-
private final int shardCountSummary;
129+
/**
130+
* Construct a SplitShardCountSummary from an integer
131+
* Used for deserialization.
132+
*/
133+
public static SplitShardCountSummary fromInt(int payload) {
134+
return new SplitShardCountSummary(payload);
135+
}
141136

142-
public SplitShardCountSummary(StreamInput in) throws IOException {
143-
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
144-
this.shardCountSummary = in.readVInt();
145-
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
146-
this.shardCountSummary = in.readInt();
147-
} else {
148-
this.shardCountSummary = UNSET.shardCountSummary;
149-
}
137+
/**
138+
* Return an integer representation of this summary
139+
* Used for serialization.
140+
*/
141+
public int asInt() {
142+
return shardCountSummary;
150143
}
151144

145+
private final int shardCountSummary;
146+
152147
/**
153148
* Returns whether this shard count summary is carrying an actual value or is UNSET
154149
*/
155150
public boolean isUnset() {
156151
return this.shardCountSummary == UNSET.shardCountSummary;
157152
}
158153

159-
// visible for testing
160154
SplitShardCountSummary(int shardCountSummary) {
161155
this.shardCountSummary = shardCountSummary;
162156
}
163157

164-
@Override
165-
public void writeTo(StreamOutput out) throws IOException {
166-
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
167-
out.writeVInt(shardCountSummary);
168-
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
169-
out.writeInt(shardCountSummary);
170-
}
171-
}
172-
173158
@Override
174159
public boolean equals(Object other) {
175160
if (this == other) {

0 commit comments

Comments
 (0)