| 
28 | 28 | import java.util.Arrays;  | 
29 | 29 | import java.util.Collections;  | 
30 | 30 | import java.util.Comparator;  | 
 | 31 | +import java.util.HashMap;  | 
31 | 32 | import java.util.HashSet;  | 
32 | 33 | import java.util.List;  | 
33 | 34 | import java.util.Locale;  | 
@@ -258,22 +259,14 @@ public ShardIterator activeInitializingShardsRankedIt(  | 
258 | 259 |         return new ShardIterator(shardId, ordered);  | 
259 | 260 |     }  | 
260 | 261 | 
 
  | 
261 |  | -    private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {  | 
262 |  | -        final Set<String> nodeIds = new HashSet<>();  | 
263 |  | -        for (ShardRouting shard : shards) {  | 
264 |  | -            nodeIds.add(shard.currentNodeId());  | 
265 |  | -        }  | 
266 |  | -        return nodeIds;  | 
267 |  | -    }  | 
268 |  | - | 
269 | 262 |     private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> getNodeStats(  | 
270 |  | -        final Set<String> nodeIds,  | 
 | 263 | +        List<ShardRouting> shardRoutings,  | 
271 | 264 |         final ResponseCollectorService collector  | 
272 | 265 |     ) {  | 
273 | 266 | 
 
  | 
274 |  | -        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = Maps.newMapWithExpectedSize(nodeIds.size());  | 
275 |  | -        for (String nodeId : nodeIds) {  | 
276 |  | -            nodeStats.put(nodeId, collector.getNodeStatistics(nodeId));  | 
 | 267 | +        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>();  | 
 | 268 | +        for (ShardRouting shardRouting : shardRoutings) {  | 
 | 269 | +            nodeStats.computeIfAbsent(shardRouting.currentNodeId(), collector::getNodeStatistics);  | 
277 | 270 |         }  | 
278 | 271 |         return nodeStats;  | 
279 | 272 |     }  | 
@@ -342,32 +335,28 @@ private static List<ShardRouting> rankShardsAndUpdateStats(  | 
342 | 335 |         }  | 
343 | 336 | 
 
  | 
344 | 337 |         // Retrieve which nodes we can potentially send the query to  | 
345 |  | -        final Set<String> nodeIds = getAllNodeIds(shards);  | 
346 |  | -        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);  | 
 | 338 | +        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(shards, collector);  | 
347 | 339 | 
 
  | 
348 | 340 |         // Retrieve all the nodes the shards exist on  | 
349 |  | -        final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);  | 
350 | 341 | 
 
  | 
351 | 342 |         // sort all shards based on the shard rank  | 
352 | 343 |         ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);  | 
353 |  | -        Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));  | 
 | 344 | +        sortedShards.sort(new NodeRankComparator(rankNodes(nodeStats, nodeSearchCounts)));  | 
354 | 345 | 
 
  | 
355 | 346 |         // adjust the non-winner nodes' stats so they will get a chance to receive queries  | 
356 |  | -        if (sortedShards.size() > 1) {  | 
357 |  | -            ShardRouting minShard = sortedShards.get(0);  | 
358 |  | -            // If the winning shard is not started we are ranking initializing  | 
359 |  | -            // shards, don't bother to do adjustments  | 
360 |  | -            if (minShard.started()) {  | 
361 |  | -                String minNodeId = minShard.currentNodeId();  | 
362 |  | -                Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);  | 
363 |  | -                if (maybeMinStats.isPresent()) {  | 
364 |  | -                    adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());  | 
365 |  | -                    // Increase the number of searches for the "winning" node by one.  | 
366 |  | -                    // Note that this doesn't actually affect the "real" counts, instead  | 
367 |  | -                    // it only affects the captured node search counts, which is  | 
368 |  | -                    // captured once for each query in TransportSearchAction  | 
369 |  | -                    nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);  | 
370 |  | -                }  | 
 | 347 | +        ShardRouting minShard = sortedShards.get(0);  | 
 | 348 | +        // If the winning shard is not started we are ranking initializing  | 
 | 349 | +        // shards, don't bother to do adjustments  | 
 | 350 | +        if (minShard.started()) {  | 
 | 351 | +            String minNodeId = minShard.currentNodeId();  | 
 | 352 | +            Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);  | 
 | 353 | +            if (maybeMinStats.isPresent()) {  | 
 | 354 | +                adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());  | 
 | 355 | +                // Increase the number of searches for the "winning" node by one.  | 
 | 356 | +                // Note that this doesn't actually affect the "real" counts, instead  | 
 | 357 | +                // it only affects the captured node search counts, which is  | 
 | 358 | +                // captured once for each query in TransportSearchAction  | 
 | 359 | +                nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);  | 
371 | 360 |             }  | 
372 | 361 |         }  | 
373 | 362 | 
 
  | 
 | 
0 commit comments