Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -403,9 +404,9 @@ private void executeBulkRequestsByShard(

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
int reshardSplitShardCountSummary = 0;
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
if (indexMetadata != null) {
reshardSplitShardCountSummary = indexMetadata.getReshardSplitShardCountSummaryForIndexing(shardId.getId());
reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
}
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -50,21 +51,26 @@ public BulkShardRequest(StreamInput in) throws IOException {
}
}

public BulkShardRequest(ShardId shardId, int reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
public BulkShardRequest(
ShardId shardId,
SplitShardCountSummary reshardSplitShardCountSummary,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items
) {
this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, 0, refreshPolicy, items, false);
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
this(shardId, 0, refreshPolicy, items, isSimulated);
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated);
}

public BulkShardRequest(
ShardId shardId,
int reshardSplitShardCountSummary,
SplitShardCountSummary reshardSplitShardCountSummary,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items,
boolean isSimulated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -47,7 +48,7 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
super(shardId);
}

public ReplicatedWriteRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
public ReplicatedWriteRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
super(shardId, reshardSplitShardCountSummary);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
Expand All @@ -38,11 +38,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
implements
IndicesRequest {

// superseded
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
// bumped to use VInt instead of Int
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");

public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);

/**
Expand Down Expand Up @@ -92,7 +87,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
* will be treated as a Summary mismatch on the source shard node.
*/
protected final int reshardSplitShardCountSummary;
protected final SplitShardCountSummary reshardSplitShardCountSummary;

/**
* The number of shard copies that must be active before proceeding with the replication action.
Expand All @@ -106,10 +101,11 @@ public ReplicationRequest(StreamInput in) throws IOException {
}

public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
this(shardId, 0, in);
this(shardId, SplitShardCountSummary.UNSET, in);
}

public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary, StreamInput in) throws IOException {
public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary, StreamInput in)
throws IOException {
super(in);
final boolean thinRead = shardId != null;
if (thinRead) {
Expand All @@ -132,27 +128,21 @@ public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountS
if (thinRead) {
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
} else {
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
this.reshardSplitShardCountSummary = in.readVInt();
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
this.reshardSplitShardCountSummary = in.readInt();
} else {
this.reshardSplitShardCountSummary = 0;
}
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
}
}

/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(@Nullable ShardId shardId) {
this(shardId, 0);
this(shardId, SplitShardCountSummary.UNSET);
}

/**
* Creates a new request with resolved shard id and reshardSplitShardCountSummary
*/
public ReplicationRequest(@Nullable ShardId shardId, int reshardSplitShardCountSummary) {
public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
this.index = shardId == null ? null : shardId.getIndexName();
this.shardId = shardId;
this.timeout = DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -209,7 +199,7 @@ public ShardId shardId() {
* @return The effective shard count as seen by the coordinator when creating this request.
* can be 0 if this has not yet been resolved.
*/
public int reshardSplitShardCountSummary() {
public SplitShardCountSummary reshardSplitShardCountSummary() {
return reshardSplitShardCountSummary;
}

Expand Down Expand Up @@ -267,11 +257,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(timeout);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
out.writeVInt(reshardSplitShardCountSummary);
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeInt(reshardSplitShardCountSummary);
}
reshardSplitShardCountSummary.writeTo(out);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -469,11 +470,11 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
throw blockException;
}

int reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
assert (reshardSplitShardCountSummary == 0
|| reshardSplitShardCountSummary == indexMetadata.getReshardSplitShardCountSummaryForIndexing(
primaryRequest.getRequest().shardId().getId()
));
SplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
assert reshardSplitShardCountSummary.isUnset()
|| reshardSplitShardCountSummary.equals(
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
);
if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,71 +1196,6 @@ public int getNumberOfShards() {
return numberOfShards;
}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId} and {@code minShardState} i.e. the minimum target shard state required for
* an operation to be routed to target shards,
* this method returns the "effective" shard count as seen by this IndexMetadata.
*
* The reshardSplitShardCountSummary tells us whether the coordinator routed requests to the source shard or
* to both source and target shards. Requests are routed to both source and target shards
* once the target shards are ready for an operation.
*
* The coordinator routes requests to source and target shards, based on its cluster state view of the state of shards
* undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to
* source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard
* to verify if the coordinator and source node's view of the resharding state have a mismatch.
* See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary}
* for a detailed description of how this value is used.
*
* @param shardId Input shardId for which we want to calculate the effective shard count
* @param minShardState Minimum target shard state required for the target to be considered ready
* @return Effective shard count as seen by an operation using this IndexMetadata
*/
private int getReshardSplitShardCountSummary(int shardId, IndexReshardingState.Split.TargetShardState minShardState) {
assert shardId >= 0 && shardId < getNumberOfShards() : "shardId is out of bounds";
int shardCount = getNumberOfShards();
if (reshardingMetadata != null) {
if (reshardingMetadata.getSplit().isTargetShard(shardId)) {
int sourceShardId = reshardingMetadata.getSplit().sourceShard(shardId);
// Requests cannot be routed to target shards until they are ready
assert reshardingMetadata.getSplit().allTargetStatesAtLeast(sourceShardId, minShardState) : "unexpected target state";
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else if (reshardingMetadata.getSplit().isSourceShard(shardId)) {
if (reshardingMetadata.getSplit().allTargetStatesAtLeast(shardId, minShardState)) {
shardCount = reshardingMetadata.getSplit().shardCountAfter();
} else {
shardCount = reshardingMetadata.getSplit().shardCountBefore();
}
}
}
return shardCount;
}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for indexing operations.
*
* See {@code getReshardSplitShardCountSummary} for more details.
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public int getReshardSplitShardCountSummaryForIndexing(int shardId) {
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF));
}

/**
* This method is used in the context of the resharding feature.
* Given a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for search operations.
*
* See {@code getReshardSplitShardCount} for more details.
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public int getReshardSplitShardCountSummaryForSearch(int shardId) {
return (getReshardSplitShardCountSummary(shardId, IndexReshardingState.Split.TargetShardState.SPLIT));
}

public int getNumberOfReplicas() {
return numberOfReplicas;
}
Expand Down
Loading