Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -109,17 +110,18 @@ protected void masterOperation(
}

Set<String> nodeIds = new HashSet<>();
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
List<SearchShardRouting> groupShardRouting = clusterService.operationRouting()
.searchShards(project, concreteIndices, routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardRouting.size()];
int currentGroup = 0;
for (ShardIterator shardIt : groupShardsIterator) {
ShardId shardId = shardIt.shardId();
ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
for (SearchShardRouting shardRouting : groupShardRouting) {
ShardIterator shardIterator = shardRouting.iterator();
ShardId shardId = shardIterator.shardId();
ShardRouting[] shardRoutings = new ShardRouting[shardIterator.size()];
int currentShard = 0;
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
shardIterator.reset();
while ((shard = shardIterator.nextOrNull()) != null) {
shardRoutings[currentShard++] = shard;
nodeIds.add(shard.currentNodeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -220,15 +221,17 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
@Override
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
ProjectState project = projectResolver.getProjectState(clusterState);
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);
final List<SearchShardRouting> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);

for (ShardIterator group : groups) {
var shardIterators = new ArrayList<ShardIterator>(groups.size());
for (SearchShardRouting group : groups) {
// fails fast if any non-active groups
if (group.size() == 0) {
throw new NoShardAvailableActionException(group.shardId());
if (group.iterator().size() == 0) {
throw new NoShardAvailableActionException(group.iterator().shardId());
}
shardIterators.add(group.iterator());
}
return groups;
return shardIterators;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -180,7 +181,11 @@ protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryReq
routing,
request.indices()
);
return clusterService.operationRouting().searchShards(project, concreteIndices, routingMap, "_local");
return clusterService.operationRouting()
.searchShards(project, concreteIndices, routingMap, "_local")
.stream()
.map(SearchShardRouting::iterator)
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -105,16 +106,16 @@ public RequestDispatcher(
ProjectState project = projectResolver.getProjectState(clusterState);

for (String index : indices) {
final List<ShardIterator> shardIts;
final List<SearchShardRouting> searchShards;
try {
shardIts = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null);
searchShards = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null);
} catch (Exception e) {
onIndexFailure.accept(index, e);
continue;
}
final IndexSelector indexResult = new IndexSelector(
fieldCapsRequest.clusterAlias(),
shardIts,
searchShards,
fieldCapsRequest.indexFilter(),
nowInMillis,
coordinatorRewriteContextProvider
Expand Down Expand Up @@ -270,14 +271,15 @@ private static class IndexSelector {

IndexSelector(
String clusterAlias,
List<ShardIterator> shardIts,
List<SearchShardRouting> searchShards,
QueryBuilder indexFilter,
long nowInMillis,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
) {
for (ShardIterator shardIt : shardIts) {
for (SearchShardRouting routing : searchShards) {
boolean canMatch = true;
final ShardId shardId = shardIt.shardId();
ShardIterator iterator = routing.iterator();
final ShardId shardId = iterator.shardId();
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
if (coordinatorRewriteContext != null) {
Expand All @@ -291,7 +293,7 @@ private static class IndexSelector {
}
}
if (canMatch) {
for (ShardRouting shard : shardIt) {
for (ShardRouting shard : iterator) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
timeProvider.absoluteStartMillis(),
shardIt.getClusterAlias(),
shardIt.getSearchContextId(),
shardIt.getSearchContextKeepAlive()
shardIt.getSearchContextKeepAlive(),
shardIt.getReshardSplitShardCountSummary()
);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -65,6 +66,7 @@ public static class Shard implements Writeable {
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private final long waitForCheckpoint;
private final SplitShardCountSummary reshardSplitShardCountSummary;

public Shard(
String[] indices,
Expand All @@ -74,7 +76,8 @@ public Shard(
float indexBoost,
ShardSearchContextId readerId,
TimeValue keepAlive,
long waitForCheckpoint
long waitForCheckpoint,
SplitShardCountSummary reshardSplitShardCountSummary
) {
this.indices = indices;
this.shardId = shardId;
Expand All @@ -85,6 +88,7 @@ public Shard(
this.keepAlive = keepAlive;
this.waitForCheckpoint = waitForCheckpoint;
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
}

public Shard(StreamInput in) throws IOException {
Expand All @@ -97,6 +101,11 @@ public Shard(StreamInput in) throws IOException {
keepAlive = in.readOptionalTimeValue();
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
} else {
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
}
}

@Override
Expand All @@ -109,6 +118,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
}
}

public int getShardRequestIndex() {
Expand Down Expand Up @@ -234,7 +246,8 @@ public ShardSearchRequest createShardSearchRequest(Shard r) {
r.keepAlive,
r.waitForCheckpoint,
waitForCheckpointsTimeout,
false
false,
r.reshardSplitShardCountSummary
);
shardSearchRequest.setParentTask(getParentTask());
return shardSearchRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
indexBoost,
shardIt.getSearchContextId(),
shardIt.getSearchContextKeepAlive(),
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex)
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
shardIt.getReshardSplitShardCountSummary()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -354,17 +355,25 @@ public IndicesOptions indicesOptions() {
}
}

private record ShardToQuery(float boost, String[] originalIndices, int shardIndex, ShardId shardId, ShardSearchContextId contextId)
implements
Writeable {
private record ShardToQuery(
float boost,
String[] originalIndices,
int shardIndex,
ShardId shardId,
ShardSearchContextId contextId,
SplitShardCountSummary reshardSplitShardCountSummary
) implements Writeable {

static ShardToQuery readFrom(StreamInput in) throws IOException {
return new ShardToQuery(
in.readFloat(),
in.readStringArray(),
in.readVInt(),
new ShardId(in),
in.readOptionalWriteable(ShardSearchContextId::new)
in.readOptionalWriteable(ShardSearchContextId::new),
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)
? SplitShardCountSummary.fromInt(in.readVInt())
: SplitShardCountSummary.UNSET
);
}

Expand All @@ -375,6 +384,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(shardIndex);
shardId.writeTo(out);
out.writeOptionalWriteable(contextId);
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
}
}
}

Expand Down Expand Up @@ -454,7 +466,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
getOriginalIndices(shardIndex).indices(),
shardIndex,
routing.getShardId(),
shardRoutings.getSearchContextId()
shardRoutings.getSearchContextId(),
shardRoutings.getReshardSplitShardCountSummary()
)
);
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);
Expand Down Expand Up @@ -651,7 +664,8 @@ private static ShardSearchRequest buildShardSearchRequest(
SearchRequest searchRequest,
int totalShardCount,
long absoluteStartMillis,
boolean hasResponse
boolean hasResponse,
SplitShardCountSummary reshardSplitShardCountSummary
) {
ShardSearchRequest shardRequest = new ShardSearchRequest(
originalIndices,
Expand All @@ -664,7 +678,8 @@ private static ShardSearchRequest buildShardSearchRequest(
absoluteStartMillis,
clusterAlias,
searchContextId,
searchContextKeepAlive
searchContextKeepAlive,
reshardSplitShardCountSummary
);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
Expand Down Expand Up @@ -702,7 +717,8 @@ private static void executeShardTasks(QueryPerNodeState state) {
searchRequest,
nodeQueryRequest.totalShards,
nodeQueryRequest.absoluteStartMillis,
state.hasResponse.getAcquire()
state.hasResponse.getAcquire(),
shardToQuery.reshardSplitShardCountSummary
)
),
state.task,
Expand Down
Loading