Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -1430,7 +1429,7 @@ static List<SearchShardIterator> mergeShardsIterators(
} else {
shards = CollectionUtils.concatLists(remoteShardIterators, localShardIterators);
}
CollectionUtil.timSort(shards);
shards.sort(SearchShardIterator::compareTo);
return shards;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -258,22 +259,14 @@ public ShardIterator activeInitializingShardsRankedIt(
return new ShardIterator(shardId, ordered);
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
nodeIds.add(shard.currentNodeId());
}
return nodeIds;
}

private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> getNodeStats(
final Set<String> nodeIds,
List<ShardRouting> shardRoutings,
final ResponseCollectorService collector
) {

final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = Maps.newMapWithExpectedSize(nodeIds.size());
for (String nodeId : nodeIds) {
nodeStats.put(nodeId, collector.getNodeStatistics(nodeId));
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
nodeStats.computeIfAbsent(shardRouting.currentNodeId(), collector::getNodeStatistics);
}
return nodeStats;
}
Expand Down Expand Up @@ -342,32 +335,28 @@ private static List<ShardRouting> rankShardsAndUpdateStats(
}

// Retrieve which nodes we can potentially send the query to
final Set<String> nodeIds = getAllNodeIds(shards);
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(shards, collector);

// Retrieve all the nodes the shards exist on
final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);

// sort all shards based on the shard rank
ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));
sortedShards.sort(new NodeRankComparator(rankNodes(nodeStats, nodeSearchCounts)));

// adjust the non-winner nodes' stats so they will get a chance to receive queries
if (sortedShards.size() > 1) {
ShardRouting minShard = sortedShards.get(0);
// If the winning shard is not started we are ranking initializing
// shards, don't bother to do adjustments
if (minShard.started()) {
String minNodeId = minShard.currentNodeId();
Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
if (maybeMinStats.isPresent()) {
adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
// Increase the number of searches for the "winning" node by one.
// Note that this doesn't actually affect the "real" counts, instead
// it only affects the captured node search counts, which is
// captured once for each query in TransportSearchAction
nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}
ShardRouting minShard = sortedShards.get(0);
// If the winning shard is not started we are ranking initializing
// shards, don't bother to do adjustments
if (minShard.started()) {
String minNodeId = minShard.currentNodeId();
Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
if (maybeMinStats.isPresent()) {
adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
// Increase the number of searches for the "winning" node by one.
// Note that this doesn't actually affect the "real" counts, instead
// it only affects the captured node search counts, which is
// captured once for each query in TransportSearchAction
nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
Expand All @@ -19,7 +18,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -110,9 +108,9 @@ public List<ShardIterator> searchShards(
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
Set<IndexShardRoutingTable> shards = computeTargetedShards(projectState, concreteIndices, routing);
final Set<IndexShardRoutingTable> shards = computeTargetedShards(projectState, concreteIndices, routing);
DiscoveryNodes nodes = projectState.cluster().nodes();
Set<ShardIterator> set = Sets.newHashSetWithExpectedSize(shards.size());
List<ShardIterator> res = new ArrayList<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
ShardIterator iterator = preferenceActiveShardIterator(
shard,
Expand All @@ -123,11 +121,10 @@ public List<ShardIterator> searchShards(
nodeCounts
);
if (iterator != null) {
set.add(ShardIterator.allSearchableShards(iterator));
res.add(ShardIterator.allSearchableShards(iterator));
}
}
List<ShardIterator> res = new ArrayList<>(set);
CollectionUtil.timSort(res);
res.sort(ShardIterator::compareTo);
return res;
}

Expand Down