Skip to content

Commit a92841f

Browse files
benchaplinmridula-s109
authored andcommitted
Add IT for num_reduced_phases with batched query execution (elastic#134312)
1 parent 3ef70f4 commit a92841f

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/120_batch_reduce_size.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
setup:
2-
- skip:
3-
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"
4-
52
- do:
63
indices.create:
74
index: test_1
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
15+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
16+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
17+
import org.elasticsearch.cluster.routing.ShardRouting;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.test.ESIntegTestCase;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
26+
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
27+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
29+
import static org.hamcrest.Matchers.equalTo;
30+
31+
public class BatchedQueryPhaseIT extends ESIntegTestCase {
32+
33+
public void testNumReducePhases() {
34+
String indexName = "test-idx";
35+
assertAcked(
36+
prepareCreate(indexName).setMapping("title", "type=keyword")
37+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
38+
);
39+
for (int i = 0; i < 100; i++) {
40+
prepareIndex(indexName).setId(Integer.toString(i)).setSource("title", "testing" + i).get();
41+
}
42+
refresh();
43+
44+
final String coordinatorNode = internalCluster().getRandomNodeName();
45+
final String coordinatorNodeId = getNodeId(coordinatorNode);
46+
assertNoFailuresAndResponse(
47+
client(coordinatorNode).prepareSearch(indexName)
48+
.setBatchedReduceSize(2)
49+
.addAggregation(terms("terms").field("title"))
50+
.setSearchType(QUERY_THEN_FETCH),
51+
response -> {
52+
Map<String, Integer> shardsPerNode = getNodeToShardCountMap(indexName);
53+
// Shards are not batched if they are already on the coordinating node or if there is only one per data node.
54+
final int coordinatorShards = shardsPerNode.getOrDefault(coordinatorNodeId, 0);
55+
final long otherSingleShardNodes = shardsPerNode.entrySet()
56+
.stream()
57+
.filter(entry -> entry.getKey().equals(coordinatorNodeId) == false)
58+
.filter(entry -> entry.getValue() == 1)
59+
.count();
60+
final int numNotBatchedShards = coordinatorShards + (int) otherSingleShardNodes;
61+
62+
// Because batched_reduce_size = 2, whenever two or more shard results exist on the coordinating node, they will be
63+
// partially reduced (batched queries do not count towards num_reduce_phases).
64+
// Hence, the formula: (# of NOT batched shards) - 1.
65+
final int expectedNumReducePhases = Math.max(1, numNotBatchedShards - 1);
66+
assertThat(response.getNumReducePhases(), equalTo(expectedNumReducePhases));
67+
}
68+
);
69+
}
70+
71+
private Map<String, Integer> getNodeToShardCountMap(String indexName) {
72+
ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
73+
IndexRoutingTable indexRoutingTable = clusterState.routingTable(ProjectId.DEFAULT).index(indexName);
74+
if (indexRoutingTable == null) {
75+
return Collections.emptyMap();
76+
}
77+
78+
Map<String, Integer> nodeToShardCount = new HashMap<>();
79+
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
80+
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
81+
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
82+
ShardRouting shardRouting = shardRoutingTable.shard(copy);
83+
String nodeId = shardRouting.currentNodeId();
84+
if (nodeId != null) {
85+
nodeToShardCount.merge(nodeId, 1, Integer::sum);
86+
}
87+
}
88+
}
89+
90+
return nodeToShardCount;
91+
}
92+
}

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
9595
final TopDocsStats topDocsStats;
9696
private volatile MergeResult mergeResult;
9797
private volatile boolean hasPartialReduce;
98+
// Note: at this time, numReducePhases does not count reductions that occur on the data node as part of batched query execution.
9899
private volatile int numReducePhases;
99100

100101
/**

0 commit comments

Comments
 (0)