Skip to content

Commit 3825623

Browse files
authored
Record stats for "adaptive replica selection" in batched queries (#144563)
Batched queries currently do not collect any of the stats needed for the ARS system to function, which leads to hot spotting when many queries are batched. This records the S (service time) and q (queue size) values from the last QuerySearchResult in the batched response. Shard queries execute in parallel but are invoked in order, so the last result gives the most recent queue size picture. The service time may vary across shards so the last result is simply an arbitrary sample. Note: we still collect R (total response time) as a side effect of using SearchExecutionStatsCollector, but it is not particularly useful for batched requests in the ARS formula, which only uses (R - S) to estimate network overhead and queue time.
1 parent 9227271 commit 3825623

File tree

4 files changed

+88
-0
lines changed

4 files changed

+88
-0
lines changed

docs/changelog/144563.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
area: Search
2+
issues:
3+
- 144146
4+
pr: 144563
5+
summary: Record stats for "adaptive replica selection" in batched queries
6+
type: bug

server/src/internalClusterTest/java/org/elasticsearch/action/search/BatchedQueryPhaseIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
1818
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.node.ResponseCollectorService;
1920
import org.elasticsearch.search.SearchService;
2021
import org.elasticsearch.test.ESIntegTestCase;
2122

@@ -28,6 +29,8 @@
2829
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2930
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
3031
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
33+
import static org.hamcrest.Matchers.notNullValue;
3134

3235
public class BatchedQueryPhaseIT extends ESIntegTestCase {
3336

@@ -73,6 +76,69 @@ public void testNumReducePhases() {
7376
);
7477
}
7578

79+
public void testAdaptiveReplicaSelectionStatsWithBatchedQueryPhase() {
80+
assumeTrue(
81+
"test skipped because batched query execution disabled by feature flag",
82+
SearchService.BATCHED_QUERY_PHASE_FEATURE_FLAG.isEnabled()
83+
);
84+
internalCluster().ensureAtLeastNumDataNodes(3);
85+
86+
String indexName = "test-ars-stats";
87+
int numDataNodes = internalCluster().numDataNodes();
88+
int numShards = numDataNodes * 4;
89+
assertAcked(
90+
prepareCreate(indexName).setMapping("title", "type=keyword")
91+
.setSettings(
92+
Settings.builder()
93+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
94+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
95+
)
96+
);
97+
for (int i = 0; i < 100; i++) {
98+
prepareIndex(indexName).setId(Integer.toString(i)).setSource("title", "testing" + i).get();
99+
}
100+
refresh();
101+
ensureGreen(indexName);
102+
103+
String coordinatorNode = internalCluster().getRandomNodeName();
104+
String coordinatorNodeId = getNodeId(coordinatorNode);
105+
106+
for (int i = 0; i < 20; i++) {
107+
assertNoFailuresAndResponse(client(coordinatorNode).prepareSearch(indexName).setSearchType(QUERY_THEN_FETCH), response -> {});
108+
}
109+
110+
ResponseCollectorService responseCollectorService = internalCluster().getInstance(ResponseCollectorService.class, coordinatorNode);
111+
Map<String, ResponseCollectorService.ComputedNodeStats> allStats = responseCollectorService.getAllNodeStatistics();
112+
Map<String, Integer> shardsPerNode = getNodeToShardCountMap(indexName);
113+
114+
long batchedRemoteNodes = shardsPerNode.entrySet()
115+
.stream()
116+
.filter(e -> e.getKey().equals(coordinatorNodeId) == false && e.getValue() >= 2)
117+
.count();
118+
assertThat(
119+
"Expected at least one remote node with 2+ shards to exercise the batched query path",
120+
batchedRemoteNodes,
121+
greaterThanOrEqualTo(1L)
122+
);
123+
124+
for (Map.Entry<String, Integer> entry : shardsPerNode.entrySet()) {
125+
String nodeId = entry.getKey();
126+
if (nodeId.equals(coordinatorNodeId)) {
127+
continue;
128+
}
129+
assertThat(
130+
"Expected adaptive replica selection stats for remote node ["
131+
+ nodeId
132+
+ "] with ["
133+
+ entry.getValue()
134+
+ "] shards, but found none. Stats were only collected for nodes: "
135+
+ allStats.keySet(),
136+
allStats.get(nodeId),
137+
notNullValue()
138+
);
139+
}
140+
}
141+
76142
private Map<String, Integer> getNodeToShardCountMap(String indexName) {
77143
ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
78144
IndexRoutingTable indexRoutingTable = clusterState.routingTable(ProjectId.DEFAULT).index(indexName);

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
534534
executeWithoutBatching(routing, request);
535535
return;
536536
}
537+
final ActionListener<? super SearchPhaseResult> statsCollector = searchTransportService.newStatsCollector(connection);
537538
searchTransportService.transportService()
538539
.sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler<NodeQueryResponse>() {
539540
@Override
@@ -556,13 +557,15 @@ public void handleResponse(NodeQueryResponse response) {
556557
queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult);
557558
}
558559
}
560+
SearchPhaseResult lastResult = null;
559561
for (int i = 0; i < response.results.length; i++) {
560562
var s = request.shards.get(i);
561563
int shardIdx = s.shardIndex;
562564
final SearchShardTarget target = new SearchShardTarget(routing.nodeId(), s.shardId, routing.clusterAlias());
563565
switch (response.results[i]) {
564566
case Exception e -> onShardFailure(shardIdx, target, shardIterators[shardIdx], e);
565567
case SearchPhaseResult q -> {
568+
lastResult = q;
566569
q.setShardIndex(shardIdx);
567570
q.setSearchShardTarget(target);
568571
onShardResult(q);
@@ -572,6 +575,12 @@ public void handleResponse(NodeQueryResponse response) {
572575
}
573576
}
574577
}
578+
if (statsCollector != null && lastResult != null) {
579+
// The last result is the most likely to give us the most recent picture of queue size, as shards queries are
580+
// invoked in the same order as the response's results array.
581+
// The service time may vary across shards so the last result is simply an arbitrary choice.
582+
statsCollector.onResponse(lastResult);
583+
}
575584
}
576585

577586
@Override

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ public void sendExecuteQuery(
211211
);
212212
}
213213

214+
ActionListener<? super SearchPhaseResult> newStatsCollector(Transport.Connection connection) {
215+
if (responseWrapper == null) {
216+
return null;
217+
}
218+
return responseWrapper.apply(connection, ActionListener.noop());
219+
}
220+
214221
public void sendExecuteQuery(
215222
Transport.Connection connection,
216223
final QuerySearchRequest request,

0 commit comments

Comments
 (0)