Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
Expand Down Expand Up @@ -40,6 +41,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ

public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);

// superseded
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
// bumped to use VInt instead of Int
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");

/**
* Target shard the request should execute on. In case of index and delete requests,
* shard id gets resolved by the transport action before performing request operation
Expand All @@ -51,41 +57,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
protected String index;

/**
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
* This is populated when the coordinator is deciding which shards a request applies to.
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
* based on its cluster state view of the number of shards that are ready for indexing.
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* When an index is being split, there is a point in time when the newly created shard (target shard)
* takes over its portion of the document space from the original shard (source shard).
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
* This field is used by the original shard (source shard) when it processes the request to detect whether
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
* or whether the request must be reprocessed taking into account the current shard states.
*
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
*
* Example 1:
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
* Requests that are meant for shard 1 and 3 are bundled together,
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
*
* Example 2:
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
* are bundled together with and sent to their source shards.
*
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
* will be treated as a Summary mismatch on the source shard node.
* The reshardSplitShardCountSummary has been added to support in-place resharding.
* See {@link SplitShardCountSummary} for details.
*/
protected final SplitShardCountSummary reshardSplitShardCountSummary;

Expand Down Expand Up @@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
if (thinRead) {
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
} else {
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
} else {
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
}
}
}

Expand Down Expand Up @@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(timeout);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
reshardSplitShardCountSummary.writeTo(out);
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeInt(reshardSplitShardCountSummary.asInt());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,9 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* The SplitShardCountSummary has been added to accommodate in-place index resharding.
Expand Down Expand Up @@ -57,12 +51,7 @@
* will be treated as a Summary mismatch on the source shard node.
*/

public class SplitShardCountSummary implements Writeable {
// superseded
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
// bumped to use VInt instead of Int
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");

public class SplitShardCountSummary {
public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0);

/**
Expand Down Expand Up @@ -137,39 +126,35 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary(
return new SplitShardCountSummary(shardCount);
}

private final int shardCountSummary;
/**
* Construct a SplitShardCountSummary from an integer
* Used for deserialization.
*/
public static SplitShardCountSummary fromInt(int payload) {
return new SplitShardCountSummary(payload);
}

public SplitShardCountSummary(StreamInput in) throws IOException {
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
this.shardCountSummary = in.readVInt();
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
this.shardCountSummary = in.readInt();
} else {
this.shardCountSummary = UNSET.shardCountSummary;
}
/**
* Return an integer representation of this summary
* Used for serialization.
*/
public int asInt() {
return shardCountSummary;
}

private final int shardCountSummary;
Copy link
Preview

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The field declaration was moved after the method definitions. Consider keeping field declarations at the top of the class for better readability and consistency with Java conventions.

Copilot uses AI. Check for mistakes.


/**
* Returns whether this shard count summary is carrying an actual value or is UNSET
*/
public boolean isUnset() {
return this.shardCountSummary == UNSET.shardCountSummary;
}

// visible for testing
SplitShardCountSummary(int shardCountSummary) {
Copy link
Preview

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The constructor visibility was changed from package-private with a comment 'visible for testing' to package-private without comment. Consider adding back the visibility comment or making it private since it's now only used internally via the fromInt() factory method.

Suggested change
SplitShardCountSummary(int shardCountSummary) {
private SplitShardCountSummary(int shardCountSummary) {

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to remove the comment. Thanks copilot!

this.shardCountSummary = shardCountSummary;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
out.writeVInt(shardCountSummary);
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeInt(shardCountSummary);
}
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down