|
28 | 28 | import java.util.Arrays; |
29 | 29 | import java.util.Collections; |
30 | 30 | import java.util.Comparator; |
| 31 | +import java.util.HashMap; |
31 | 32 | import java.util.HashSet; |
32 | 33 | import java.util.List; |
33 | 34 | import java.util.Locale; |
@@ -258,22 +259,14 @@ public ShardIterator activeInitializingShardsRankedIt( |
258 | 259 | return new ShardIterator(shardId, ordered); |
259 | 260 | } |
260 | 261 |
|
261 | | - private static Set<String> getAllNodeIds(final List<ShardRouting> shards) { |
262 | | - final Set<String> nodeIds = new HashSet<>(); |
263 | | - for (ShardRouting shard : shards) { |
264 | | - nodeIds.add(shard.currentNodeId()); |
265 | | - } |
266 | | - return nodeIds; |
267 | | - } |
268 | | - |
269 | 262 | private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> getNodeStats( |
270 | | - final Set<String> nodeIds, |
| 263 | + List<ShardRouting> shardRoutings, |
271 | 264 | final ResponseCollectorService collector |
272 | 265 | ) { |
273 | 266 |
|
274 | | - final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = Maps.newMapWithExpectedSize(nodeIds.size()); |
275 | | - for (String nodeId : nodeIds) { |
276 | | - nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); |
| 267 | + final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>(); |
| 268 | + for (ShardRouting shardRouting : shardRoutings) { |
| 269 | + nodeStats.computeIfAbsent(shardRouting.currentNodeId(), collector::getNodeStatistics); |
277 | 270 | } |
278 | 271 | return nodeStats; |
279 | 272 | } |
@@ -342,32 +335,28 @@ private static List<ShardRouting> rankShardsAndUpdateStats( |
342 | 335 | } |
343 | 336 |
|
344 | 337 | // Retrieve which nodes we can potentially send the query to |
345 | | - final Set<String> nodeIds = getAllNodeIds(shards); |
346 | | - final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector); |
| 338 | + final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(shards, collector); |
347 | 339 |
|
348 | 340 | // Retrieve all the nodes the shards exist on |
349 | | - final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts); |
350 | 341 |
|
351 | 342 | // sort all shards based on the shard rank |
352 | 343 | ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards); |
353 | | - Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); |
| 344 | + sortedShards.sort(new NodeRankComparator(rankNodes(nodeStats, nodeSearchCounts))); |
354 | 345 |
|
355 | 346 | // adjust the non-winner nodes' stats so they will get a chance to receive queries |
356 | | - if (sortedShards.size() > 1) { |
357 | | - ShardRouting minShard = sortedShards.get(0); |
358 | | - // If the winning shard is not started we are ranking initializing |
359 | | - // shards, don't bother to do adjustments |
360 | | - if (minShard.started()) { |
361 | | - String minNodeId = minShard.currentNodeId(); |
362 | | - Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId); |
363 | | - if (maybeMinStats.isPresent()) { |
364 | | - adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); |
365 | | - // Increase the number of searches for the "winning" node by one. |
366 | | - // Note that this doesn't actually affect the "real" counts, instead |
367 | | - // it only affects the captured node search counts, which is |
368 | | - // captured once for each query in TransportSearchAction |
369 | | - nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); |
370 | | - } |
| 347 | + ShardRouting minShard = sortedShards.get(0); |
| 348 | + // If the winning shard is not started we are ranking initializing |
| 349 | + // shards, don't bother to do adjustments |
| 350 | + if (minShard.started()) { |
| 351 | + String minNodeId = minShard.currentNodeId(); |
| 352 | + Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId); |
| 353 | + if (maybeMinStats.isPresent()) { |
| 354 | + adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); |
| 355 | + // Increase the number of searches for the "winning" node by one. |
| 356 | + // Note that this doesn't actually affect the "real" counts, instead |
| 357 | + // it only affects the captured node search counts, which is |
| 358 | + // captured once for each query in TransportSearchAction |
| 359 | + nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); |
371 | 360 | } |
372 | 361 | } |
373 | 362 |
|
|
0 commit comments