Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -404,9 +404,9 @@ private void executeBulkRequestsByShard(

// Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest
var indexMetadata = project.index(shardId.getIndexName());
SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
IndexSplitShardCountSummary reshardSplitShardCountSummary = IndexSplitShardCountSummary.UNSET;
if (indexMetadata != null) {
reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId());
reshardSplitShardCountSummary = IndexSplitShardCountSummary.fromMetadata(indexMetadata, shardId.getId());
}
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -53,24 +53,24 @@ public BulkShardRequest(StreamInput in) throws IOException {

public BulkShardRequest(
ShardId shardId,
SplitShardCountSummary reshardSplitShardCountSummary,
IndexSplitShardCountSummary reshardSplitShardCountSummary,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items
) {
this(shardId, reshardSplitShardCountSummary, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, false);
this(shardId, IndexSplitShardCountSummary.UNSET, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated);
this(shardId, IndexSplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated);
}

public BulkShardRequest(
ShardId shardId,
SplitShardCountSummary reshardSplitShardCountSummary,
IndexSplitShardCountSummary reshardSplitShardCountSummary,
RefreshPolicy refreshPolicy,
BulkItemRequest[] items,
boolean isSimulated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -48,7 +48,7 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) {
super(shardId);
}

public ReplicatedWriteRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
public ReplicatedWriteRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary) {
super(shardId, reshardSplitShardCountSummary);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -52,42 +52,9 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ

/**
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
* This is populated when the coordinator is deciding which shards a request applies to.
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
* based on its cluster state view of the number of shards that are ready for indexing.
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
* node with that visible at the source shard node. (w.r.t resharding).
* When an index is being split, there is a point in time when the newly created shard (target shard)
* takes over its portion of the document space from the original shard (source shard).
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
* This field is used by the original shard (source shard) when it processes the request to detect whether
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
* or whether the request must be reprocessed taking into account the current shard states.
*
* Note that we are able to get away with a single number, instead of an array of target shard states,
* because we only allow splits in increments of 2x.
*
* Example 1:
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
* Requests that are meant for shard 1 and 3 are bundled together,
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
*
* Example 2:
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
* are bundled together with and sent to their source shards.
*
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
* will be treated as a Summary mismatch on the source shard node.
* See {@link IndexSplitShardCountSummary} for details.
*/
protected final SplitShardCountSummary reshardSplitShardCountSummary;
protected final IndexSplitShardCountSummary reshardSplitShardCountSummary;

/**
* The number of shard copies that must be active before proceeding with the replication action.
Expand All @@ -101,10 +68,10 @@ public ReplicationRequest(StreamInput in) throws IOException {
}

public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException {
this(shardId, SplitShardCountSummary.UNSET, in);
this(shardId, IndexSplitShardCountSummary.UNSET, in);
}

public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary, StreamInput in)
public ReplicationRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary, StreamInput in)
throws IOException {
super(in);
final boolean thinRead = shardId != null;
Expand All @@ -128,21 +95,21 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
if (thinRead) {
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
} else {
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
this.reshardSplitShardCountSummary = new IndexSplitShardCountSummary(in);
}
}

/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(@Nullable ShardId shardId) {
this(shardId, SplitShardCountSummary.UNSET);
this(shardId, IndexSplitShardCountSummary.UNSET);
}

/**
* Creates a new request with resolved shard id and reshardSplitShardCountSummary
*/
public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) {
public ReplicationRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary) {
this.index = shardId == null ? null : shardId.getIndexName();
this.shardId = shardId;
this.timeout = DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -199,7 +166,7 @@ public ShardId shardId() {
* @return The effective shard count as seen by the coordinator when creating this request.
* can be 0 if this has not yet been resolved.
*/
public SplitShardCountSummary reshardSplitShardCountSummary() {
public IndexSplitShardCountSummary reshardSplitShardCountSummary() {
return reshardSplitShardCountSummary;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -470,10 +470,10 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
throw blockException;
}

SplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
IndexSplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
assert reshardSplitShardCountSummary.isUnset()
|| reshardSplitShardCountSummary.equals(
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
IndexSplitShardCountSummary.fromMetadata(indexMetadata, primaryRequest.getRequest().shardId().getId())
);
if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class IndexSplitShardCountSummary extends SplitShardCountSummary {
// superseded
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
// bumped to use VInt instead of Int
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");

public static final IndexSplitShardCountSummary UNSET = new IndexSplitShardCountSummary(UNSET_VALUE);

public IndexSplitShardCountSummary(StreamInput in) throws IOException {
super(readShardCountSummary(in));
}

/**
* Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for indexing operations.
*
* See {@code getReshardSplitShardCountSummary} for more details.
* @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public static IndexSplitShardCountSummary fromMetadata(IndexMetadata indexMetadata, int shardId) {
return new IndexSplitShardCountSummary(
getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.HANDOFF)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
out.writeVInt(shardCountSummary);
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeInt(shardCountSummary);
}
}

private static int readShardCountSummary(StreamInput in) throws IOException {
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
return in.readVInt();
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
return in.readInt();
} else {
return UNSET_VALUE;
}
}

IndexSplitShardCountSummary(int shardCountSummary) {
super(shardCountSummary);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class SearchSplitShardCountSummary extends SplitShardCountSummary {
private static final TransportVersion SEARCH_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName(
"search_reshard_shardcount_summary"
);

public SearchSplitShardCountSummary(StreamInput in) throws IOException {
super(readShardCountSummary(in));
}

/**
* Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count
* as seen by this IndexMetadata, for indexing operations.
*
* See {@code getReshardSplitShardCountSummary} for more details.
* @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count
* @param shardId Input shardId for which we want to calculate the effective shard count
*/
public static SearchSplitShardCountSummary fromMetadata(IndexMetadata indexMetadata, int shardId) {
return new SearchSplitShardCountSummary(
getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.SPLIT)
);
}

private static int readShardCountSummary(StreamInput in) throws IOException {
if (in.getTransportVersion().supports(SEARCH_RESHARD_SHARDCOUNT_SUMMARY)) {
return in.readVInt();
} else {
return UNSET_VALUE;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().supports(SEARCH_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeVInt(shardCountSummary);
}
}

SearchSplitShardCountSummary(int shardCountSummary) {
super(shardCountSummary);
}
}
Loading