Skip to content

Commit 9981768

Browse files
authored
Merge branch 'main' into cef_processor
2 parents 15a0de2 + fd2cc97 commit 9981768

File tree

25 files changed

+1157
-65
lines changed

25 files changed

+1157
-65
lines changed

docs/changelog/121885.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121885
2+
summary: Introduce batched query execution and data-node side reduce
3+
area: Search
4+
type: enhancement
5+
issues: []

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
import org.elasticsearch.action.search.MultiSearchRequest;
1515
import org.elasticsearch.action.search.SearchRequest;
1616
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.common.util.CollectionUtils;
1819
import org.elasticsearch.plugins.Plugin;
1920
import org.elasticsearch.search.ErrorTraceHelper;
21+
import org.elasticsearch.search.SearchService;
2022
import org.elasticsearch.search.builder.SearchSourceBuilder;
2123
import org.elasticsearch.test.transport.MockTransportService;
2224
import org.elasticsearch.xcontent.XContentType;
25+
import org.junit.After;
2326
import org.junit.Before;
2427

2528
import java.io.IOException;
@@ -40,6 +43,13 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4043
@Before
4144
public void setupMessageListener() {
4245
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
46+
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
47+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
48+
}
49+
50+
@After
51+
public void resetSettings() {
52+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
4353
}
4454

4555
private void setupIndexWithDocs() {

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/120_batch_reduce_size.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
setup:
2+
- skip:
3+
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"
4+
25
- do:
36
indices.create:
47
index: test_1
@@ -48,7 +51,6 @@ setup:
4851
batched_reduce_size: 2
4952
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }
5053

51-
- match: { num_reduce_phases: 4 }
5254
- match: { hits.total: 3 }
5355
- length: { aggregations.str_terms.buckets: 2 }
5456
- match: { aggregations.str_terms.buckets.0.key: "abc" }

server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -562,11 +562,8 @@ public void testSearchQueryThenFetch() throws Exception {
562562
);
563563

564564
clearInterceptedActions();
565-
assertIndicesSubset(
566-
Arrays.asList(searchRequest.indices()),
567-
SearchTransportService.QUERY_ACTION_NAME,
568-
SearchTransportService.FETCH_ID_ACTION_NAME
569-
);
565+
assertIndicesSubset(Arrays.asList(searchRequest.indices()), true, SearchTransportService.QUERY_ACTION_NAME);
566+
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.FETCH_ID_ACTION_NAME);
570567
}
571568

572569
public void testSearchDfsQueryThenFetch() throws Exception {
@@ -619,10 +616,6 @@ private static void assertIndicesSubset(List<String> indices, String... actions)
619616
assertIndicesSubset(indices, false, actions);
620617
}
621618

622-
private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
623-
assertIndicesSubset(indices, true, actions);
624-
}
625-
626619
private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
627620
// indices returned by each bulk shard request need to be a subset of the original indices
628621
for (String action : actions) {

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.query.QueryBuilders;
4242
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4343
import org.elasticsearch.plugins.Plugin;
44+
import org.elasticsearch.search.SearchService;
4445
import org.elasticsearch.search.builder.SearchSourceBuilder;
4546
import org.elasticsearch.tasks.RemovedTaskListener;
4647
import org.elasticsearch.tasks.Task;
@@ -352,6 +353,8 @@ public void testTransportBulkTasks() {
352353
}
353354

354355
public void testSearchTaskDescriptions() {
356+
// TODO: enhance this test to also check the tasks created by batched query execution
357+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
355358
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
356359
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
357360
createIndex("test");
@@ -398,7 +401,7 @@ public void testSearchTaskDescriptions() {
398401
// assert that all task descriptions have non-zero length
399402
assertThat(taskInfo.description().length(), greaterThan(0));
400403
}
401-
404+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
402405
}
403406

404407
public void testSearchTaskHeaderLimit() {

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.rest.RestStatus;
4141
import org.elasticsearch.search.DocValueFormat;
4242
import org.elasticsearch.search.SearchHit;
43+
import org.elasticsearch.search.SearchService;
4344
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
4445
import org.elasticsearch.search.aggregations.AggregationBuilder;
4546
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -446,6 +447,7 @@ public void testSearchIdle() throws Exception {
446447
}
447448

448449
public void testCircuitBreakerReduceFail() throws Exception {
450+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
449451
int numShards = randomIntBetween(1, 10);
450452
indexSomeDocs("test", numShards, numShards * 3);
451453

@@ -519,7 +521,9 @@ public void onFailure(Exception exc) {
519521
}
520522
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
521523
} finally {
522-
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
524+
updateClusterSettings(
525+
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
526+
);
523527
}
524528
}
525529

server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.search.TransportSearchAction;
2424
import org.elasticsearch.action.search.TransportSearchScrollAction;
2525
import org.elasticsearch.common.Strings;
26+
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.script.Script;
2829
import org.elasticsearch.script.ScriptType;
@@ -239,6 +240,8 @@ public void testCancelMultiSearch() throws Exception {
239240
}
240241

241242
public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
243+
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
244+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
242245
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
243246
// are limited to 1
244247
internalCluster().ensureAtLeastNumDataNodes(2);

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
1515
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.search.SearchService;
1617
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
1718
import org.elasticsearch.search.aggregations.BucketOrder;
1819
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
1920
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
2021
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
2122
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.junit.After;
24+
import org.junit.Before;
2225

2326
import java.io.IOException;
2427
import java.util.ArrayList;
@@ -50,6 +53,18 @@ public static String randomExecutionHint() {
5053

5154
private static int numRoutingValues;
5255

56+
@Before
57+
public void disableBatchedExecution() {
58+
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
59+
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
60+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
61+
}
62+
63+
@After
64+
public void resetSettings() {
65+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
66+
}
67+
5368
@Override
5469
public void setupSuiteScopeCluster() throws Exception {
5570
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ static TransportVersion def(int id) {
208208
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
209209
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
210210
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
211+
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
211212

212213
/*
213214
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,33 +64,33 @@
6464
* distributed frequencies
6565
*/
6666
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
67-
private static final float DEFAULT_INDEX_BOOST = 1.0f;
67+
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
6868
private final Logger logger;
6969
private final NamedWriteableRegistry namedWriteableRegistry;
70-
private final SearchTransportService searchTransportService;
70+
protected final SearchTransportService searchTransportService;
7171
private final Executor executor;
7272
private final ActionListener<SearchResponse> listener;
73-
private final SearchRequest request;
73+
protected final SearchRequest request;
7474

7575
/**
7676
* Used by subclasses to resolve node ids to DiscoveryNodes.
7777
**/
7878
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
79-
private final SearchTask task;
79+
protected final SearchTask task;
8080
protected final SearchPhaseResults<Result> results;
8181
private final long clusterStateVersion;
8282
private final TransportVersion minTransportVersion;
83-
private final Map<String, AliasFilter> aliasFilter;
84-
private final Map<String, Float> concreteIndexBoosts;
83+
protected final Map<String, AliasFilter> aliasFilter;
84+
protected final Map<String, Float> concreteIndexBoosts;
8585
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8686
private final Object shardFailuresMutex = new Object();
8787
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
8888
private final AtomicInteger successfulOps;
89-
private final SearchTimeProvider timeProvider;
89+
protected final SearchTimeProvider timeProvider;
9090
private final SearchResponse.Clusters clusters;
9191

9292
protected final List<SearchShardIterator> shardsIts;
93-
private final SearchShardIterator[] shardIterators;
93+
protected final SearchShardIterator[] shardIterators;
9494
private final AtomicInteger outstandingShards;
9595
private final int maxConcurrentRequestsPerNode;
9696
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
@@ -230,10 +230,17 @@ protected final void run() {
230230
onPhaseDone();
231231
return;
232232
}
233+
if (shardsIts.isEmpty()) {
234+
return;
235+
}
233236
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
234237
for (int i = 0; i < shardIterators.length; i++) {
235238
shardIndexMap.put(shardIterators[i], i);
236239
}
240+
doRun(shardIndexMap);
241+
}
242+
243+
protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
237244
doCheckNoMissingShards(getName(), request, shardsIts);
238245
for (int i = 0; i < shardsIts.size(); i++) {
239246
final SearchShardIterator shardRoutings = shardsIts.get(i);
@@ -249,7 +256,7 @@ protected final void run() {
249256
}
250257
}
251258

252-
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
259+
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
253260
if (throttleConcurrentRequests) {
254261
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
255262
shard.getNodeId(),
@@ -289,7 +296,7 @@ public void onFailure(Exception e) {
289296
executePhaseOnShard(shardIt, connection, shardListener);
290297
}
291298

292-
private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
299+
protected final void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
293300
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());
294301
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
295302
}
@@ -396,7 +403,7 @@ private ShardSearchFailure[] buildShardFailures() {
396403
return failures;
397404
}
398405

399-
private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
406+
protected final void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
400407
// we always add the shard failure for a specific shard instance
401408
// we do make sure to clean it on a successful response from a shard
402409
onShardFailure(shardIndex, shard, e);

0 commit comments

Comments
 (0)