-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Do not use split shards for search and refresh if they are not ready #132042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
4f9584f
4037ec4
decc2a9
6b23bf3
2e612be
77a3ae2
6ae508a
820ce80
96a0e1d
328bd1b
2bec0ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,12 +22,13 @@ | |
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; | ||
import org.elasticsearch.client.internal.node.NodeClient; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ProjectState; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
import org.elasticsearch.cluster.project.ProjectResolver; | ||
import org.elasticsearch.cluster.routing.IndexRoutingTable; | ||
import org.elasticsearch.cluster.routing.RoutingTable; | ||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
import org.elasticsearch.cluster.routing.OperationRouting; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
|
@@ -39,6 +40,7 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.Executor; | ||
|
@@ -99,7 +101,8 @@ public void accept(ActionListener<Response> listener) { | |
|
||
final ClusterState clusterState = clusterService.state(); | ||
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState); | ||
final List<ShardId> shards = shards(request, project, clusterState.routingTable(project.id())); | ||
final ProjectState projectState = projectResolver.getProjectState(clusterState); | ||
final List<ShardId> shards = shards(request, projectState); | ||
final Map<String, IndexMetadata> indexMetadataByName = project.indices(); | ||
|
||
try (var refs = new RefCountingRunnable(() -> finish(listener))) { | ||
|
@@ -185,17 +188,17 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL | |
/** | ||
* @return all shard ids the request should run on | ||
*/ | ||
protected List<ShardId> shards(Request request, ProjectMetadata project, RoutingTable indexRoutingTables) { | ||
protected List<ShardId> shards(Request request, ProjectState projectState) { | ||
assert Transports.assertNotTransportThread("may hit all the shards"); | ||
List<ShardId> shardIds = new ArrayList<>(); | ||
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request); | ||
|
||
OperationRouting operationRouting = clusterService.operationRouting(); | ||
|
||
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request); | ||
for (String index : concreteIndices) { | ||
IndexMetadata indexMetadata = project.indices().get(index); | ||
if (indexMetadata != null) { | ||
final IndexRoutingTable indexRoutingTable = indexRoutingTables.indicesRouting().get(index); | ||
for (int i = 0; i < indexRoutingTable.size(); i++) { | ||
shardIds.add(indexRoutingTable.shard(i).shardId()); | ||
} | ||
Iterator<IndexShardRoutingTable> iterator = operationRouting.allWritableShards(projectState, index); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This handles refresh and flush APIs. |
||
while (iterator.hasNext()) { | ||
shardIds.add(iterator.next().shardId()); | ||
} | ||
} | ||
return shardIds; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ | |
|
||
import org.elasticsearch.cluster.ProjectState; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; | ||
import org.elasticsearch.cluster.metadata.IndexReshardingState; | ||
import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.common.Strings; | ||
|
@@ -26,6 +28,7 @@ | |
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -112,6 +115,10 @@ public List<ShardIterator> searchShards( | |
return res; | ||
} | ||
|
||
public Iterator<IndexShardRoutingTable> allWritableShards(ProjectState projectState, String index) { | ||
return allShardsReadyForWrites(projectState, index); | ||
} | ||
|
||
public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) { | ||
final IndexShardRoutingTable shard = routingTable.shardRoutingTable(shardId); | ||
return shard.activeInitializingShardsRandomIt(); | ||
|
@@ -125,7 +132,7 @@ private static Set<IndexShardRoutingTable> computeTargetedShards( | |
// we use set here and not list since we might get duplicates | ||
final Set<IndexShardRoutingTable> set = new HashSet<>(); | ||
if (routing == null || routing.isEmpty()) { | ||
collectTargetShardsNoRouting(projectState.routingTable(), concreteIndices, set); | ||
collectTargetShardsNoRouting(projectState, concreteIndices, set); | ||
} else { | ||
collectTargetShardsWithRouting(projectState, concreteIndices, routing, set); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how is this branch resolving target shards to source shards before split? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is handled inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, but what about this codepath in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should, thanks for catching that. |
||
} | ||
|
@@ -154,13 +161,53 @@ private static void collectTargetShardsWithRouting( | |
} | ||
} | ||
|
||
private static void collectTargetShardsNoRouting(RoutingTable routingTable, String[] concreteIndices, Set<IndexShardRoutingTable> set) { | ||
private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set<IndexShardRoutingTable> set) { | ||
for (String index : concreteIndices) { | ||
final IndexRoutingTable indexRoutingTable = indexRoutingTable(routingTable, index); | ||
for (int i = 0; i < indexRoutingTable.size(); i++) { | ||
set.add(indexRoutingTable.shard(i)); | ||
Iterator<IndexShardRoutingTable> iterator = allShardsReadyForSearch(projectState, index); | ||
while (iterator.hasNext()) { | ||
set.add(iterator.next()); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Returns an iterator of shards of the index that are ready to execute search requests. | ||
* A shard may not be ready to execute these operations during processes like resharding. | ||
*/ | ||
private static Iterator<IndexShardRoutingTable> allShardsReadyForSearch(ProjectState projectState, String index) { | ||
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT); | ||
} | ||
|
||
/** | ||
* Returns an iterator of shards of the index that are ready to execute write requests. | ||
* A shard may not be ready to execute these operations during processes like resharding. | ||
*/ | ||
private static Iterator<IndexShardRoutingTable> allShardsReadyForWrites(ProjectState projectState, String index) { | ||
|
||
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF); | ||
} | ||
|
||
private static Iterator<IndexShardRoutingTable> allShardsExceptSplitTargetsInStateBefore( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment here so the reader knows it is associated with resharding and not to be confused with the split API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that class level documentation on |
||
ProjectState projectState, | ||
String index, | ||
IndexReshardingState.Split.TargetShardState targetShardState | ||
) { | ||
final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index); | ||
final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index); | ||
if (indexMetadata.getReshardingMetadata() == null) { | ||
return indexRoutingTable.allShards().iterator(); | ||
} | ||
|
||
final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata(); | ||
assert indexReshardingMetadata.isSplit(); | ||
final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit(); | ||
|
||
var shards = new ArrayList<IndexShardRoutingTable>(); | ||
for (int i = 0; i < indexRoutingTable.size(); i++) { | ||
if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) { | ||
shards.add(indexRoutingTable.shard(i)); | ||
} | ||
} | ||
return shards.iterator(); | ||
} | ||
|
||
private ShardIterator preferenceActiveShardIterator( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't know how expensive a projectResolver is but perhaps it's a little cheaper to resolve once and then retrieve ProjectMetadata from projectState.metadata() ?