99
1010package org .elasticsearch .action .support .replication ;
1111
12+ import org .elasticsearch .TransportVersion ;
1213import org .elasticsearch .action .ActionRequestValidationException ;
1314import org .elasticsearch .action .IndicesRequest ;
1415import org .elasticsearch .action .LegacyActionRequest ;
@@ -40,6 +41,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
4041
4142 public static final TimeValue DEFAULT_TIMEOUT = TimeValue .timeValueMinutes (1 );
4243
44+ // superseded
45+ private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion .fromName ("index_reshard_shardcount_summary" );
46+ // bumped to use VInt instead of Int
47+ private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion .fromName ("index_reshard_shardcount_small" );
48+
4349 /**
4450 * Target shard the request should execute on. In case of index and delete requests,
4551 * shard id gets resolved by the transport action before performing request operation
@@ -51,41 +57,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5157 protected String index ;
5258
5359 /**
54- * The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
55- * This is populated when the coordinator is deciding which shards a request applies to.
56- * For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
57- * an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
58- * based on its cluster state view of the number of shards that are ready for indexing.
59- * The purpose of this metadata is to reconcile the cluster state visible at the coordinating
60- * node with that visible at the source shard node. (w.r.t resharding).
61- * When an index is being split, there is a point in time when the newly created shard (target shard)
62- * takes over its portion of the document space from the original shard (source shard).
63- * Although the handoff is atomic at the original (source shard) and new shards (target shard),
64- * there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
65- * This field is used by the original shard (source shard) when it processes the request to detect whether
66- * the coordinator's view of the new shard's state when it created the request matches the shard's current state,
67- * or whether the request must be reprocessed taking into account the current shard states.
68- *
69- * Note that we are able to get away with a single number, instead of an array of target shard states,
70- * because we only allow splits in increments of 2x.
71- *
72- * Example 1:
73- * Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
74- * that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
75- * sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
76- * Requests that are meant for shard 1 and 3 are bundled together,
77- * sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
78- *
79- * Example 2:
80- * Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
81- * that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
82- * So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
83- * while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
84- * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
85- * are bundled together with and sent to their source shards.
86- *
87- * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
88- * will be treated as a Summary mismatch on the source shard node.
60+ * The reshardSplitShardCountSummary has been added to support in-place resharding.
61+ * See {@link SplitShardCountSummary} for details.
8962 */
9063 protected final SplitShardCountSummary reshardSplitShardCountSummary ;
9164
@@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
128101 if (thinRead ) {
129102 this .reshardSplitShardCountSummary = reshardSplitShardCountSummary ;
130103 } else {
131- this .reshardSplitShardCountSummary = new SplitShardCountSummary (in );
104+ if (in .getTransportVersion ().supports (INDEX_RESHARD_SHARDCOUNT_SMALL )) {
105+ this .reshardSplitShardCountSummary = SplitShardCountSummary .fromInt (in .readVInt ());
106+ } else if (in .getTransportVersion ().supports (INDEX_RESHARD_SHARDCOUNT_SUMMARY )) {
107+ this .reshardSplitShardCountSummary = SplitShardCountSummary .fromInt (in .readInt ());
108+ } else {
109+ this .reshardSplitShardCountSummary = SplitShardCountSummary .UNSET ;
110+ }
132111 }
133112 }
134113
@@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException {
257236 out .writeTimeValue (timeout );
258237 out .writeString (index );
259238 out .writeVLong (routedBasedOnClusterVersion );
260- reshardSplitShardCountSummary .writeTo (out );
239+ if (out .getTransportVersion ().supports (INDEX_RESHARD_SHARDCOUNT_SMALL )) {
240+ out .writeVInt (reshardSplitShardCountSummary .asInt ());
241+ } else if (out .getTransportVersion ().supports (INDEX_RESHARD_SHARDCOUNT_SUMMARY )) {
242+ out .writeInt (reshardSplitShardCountSummary .asInt ());
243+ }
261244 }
262245
263246 /**
0 commit comments