Skip to content

Commit cec0c0f

Browse files
escpae hatch
1 parent f429eef commit cec0c0f

File tree

9 files changed

+40
-16
lines changed

9 files changed

+40
-16
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.index.query.QueryBuilders;
4343
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4444
import org.elasticsearch.plugins.Plugin;
45+
import org.elasticsearch.search.SearchService;
4546
import org.elasticsearch.search.builder.SearchSourceBuilder;
4647
import org.elasticsearch.tasks.RemovedTaskListener;
4748
import org.elasticsearch.tasks.Task;
@@ -352,8 +353,8 @@ public void testTransportBulkTasks() {
352353
assertParentTask(findEvents(TransportBulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
353354
}
354355

355-
@AwaitsFix(bugUrl = "TODO adjust")
356356
public void testSearchTaskDescriptions() {
357+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
357358
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
358359
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
359360
createIndex("test");
@@ -401,7 +402,7 @@ public void testSearchTaskDescriptions() {
401402
// assert that all task descriptions have non-zero length
402403
assertThat(taskInfo.description().length(), greaterThan(0));
403404
}
404-
405+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
405406
}
406407

407408
public void testSearchTaskHeaderLimit() {

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.rest.RestStatus;
4141
import org.elasticsearch.search.DocValueFormat;
4242
import org.elasticsearch.search.SearchHit;
43+
import org.elasticsearch.search.SearchService;
4344
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
4445
import org.elasticsearch.search.aggregations.AggregationBuilder;
4546
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -444,8 +445,8 @@ public void testSearchIdle() throws Exception {
444445
);
445446
}
446447

447-
@AwaitsFix(bugUrl = "TODO")
448448
public void testCircuitBreakerReduceFail() throws Exception {
449+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
449450
int numShards = randomIntBetween(1, 10);
450451
indexSomeDocs("test", numShards, numShards * 3);
451452

@@ -519,7 +520,9 @@ public void onFailure(Exception exc) {
519520
}
520521
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
521522
} finally {
522-
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
523+
updateClusterSettings(
524+
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
525+
);
523526
}
524527
}
525528

server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.search.TransportSearchAction;
2424
import org.elasticsearch.action.search.TransportSearchScrollAction;
2525
import org.elasticsearch.common.Strings;
26+
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.script.Script;
2829
import org.elasticsearch.script.ScriptType;
@@ -238,8 +239,8 @@ public void testCancelMultiSearch() throws Exception {
238239
}
239240
}
240241

241-
@AwaitsFix(bugUrl = "TODO needs update to work with concurrent search and batched exec")
242242
public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
243+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
243244
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
244245
// are limited to 1
245246
internalCluster().ensureAtLeastNumDataNodes(2);

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
8888
private final int trackTotalHitsUpTo;
8989
private volatile BottomSortValuesCollector bottomSortCollector;
9090
private final Client client;
91+
private final boolean batchQueryPhase;
9192

9293
SearchQueryThenFetchAsyncAction(
9394
Logger logger,
@@ -105,7 +106,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
105106
ClusterState clusterState,
106107
SearchTask task,
107108
SearchResponse.Clusters clusters,
108-
Client client
109+
Client client,
110+
boolean batchQueryPhase
109111
) {
110112
super(
111113
"query",
@@ -130,7 +132,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
130132
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
131133
this.progressListener = task.getProgressListener();
132134
this.client = client;
133-
135+
this.batchQueryPhase = batchQueryPhase;
134136
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
135137
if (progressListener != SearchProgressListener.NOOP) {
136138
notifyListShards(progressListener, clusters, request.source());
@@ -415,7 +417,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
415417
} else {
416418
final String nodeId = routing.getNodeId();
417419
// local requests don't need batching as there's no network latency
418-
if (localNodeId.equals(nodeId) == false) {
420+
if (this.batchQueryPhase && localNodeId.equals(nodeId) == false) {
419421
perNodeQueries.computeIfAbsent(
420422
new CanMatchPreFilterSearchPhase.SendingTarget(routing.getClusterAlias(), routing.getNodeId()),
421423
ignored -> new NodeQueryRequest(

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,8 @@ public void runNewSearchPhase(
15661566
clusterState,
15671567
task,
15681568
clusters,
1569-
client
1569+
client,
1570+
searchService.batchQueryPhase()
15701571
);
15711572
}
15721573
success = true;

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,22 +485,22 @@ public static void writeTopDocs(StreamOutput out, TopDocsAndMaxScore topDocs) th
485485
if (topDocs.topDocs instanceof TopFieldGroups topFieldGroups) {
486486
out.writeByte((byte) 2);
487487

488-
writeTotalHits(out, topDocs.topDocs.totalHits);
488+
writeTotalHits(out, topFieldGroups.totalHits);
489489
out.writeFloat(topDocs.maxScore);
490490

491491
out.writeString(topFieldGroups.field);
492492
out.writeArray(Lucene::writeSortField, topFieldGroups.fields);
493493

494-
out.writeVInt(topDocs.topDocs.scoreDocs.length);
495-
for (int i = 0; i < topDocs.topDocs.scoreDocs.length; i++) {
494+
out.writeVInt(topFieldGroups.scoreDocs.length);
495+
for (int i = 0; i < topFieldGroups.scoreDocs.length; i++) {
496496
ScoreDoc doc = topFieldGroups.scoreDocs[i];
497497
writeFieldDoc(out, (FieldDoc) doc);
498498
writeSortValue(out, topFieldGroups.groupValues[i]);
499499
}
500500
} else if (topDocs.topDocs instanceof TopFieldDocs topFieldDocs) {
501501
out.writeByte((byte) 1);
502502

503-
writeTotalHits(out, topDocs.topDocs.totalHits);
503+
writeTotalHits(out, topFieldDocs.totalHits);
504504
out.writeFloat(topDocs.maxScore);
505505

506506
out.writeArray(Lucene::writeSortField, topFieldDocs.fields);

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ public void apply(Settings value, Settings current, Settings previous) {
479479
SearchService.ALLOW_EXPENSIVE_QUERIES,
480480
SearchService.CCS_VERSION_CHECK_SETTING,
481481
SearchService.CCS_COLLECT_TELEMETRY,
482+
SearchService.BATCHED_QUERY_PHASE,
482483
MultiBucketConsumerService.MAX_BUCKET_SETTING,
483484
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
484485
SearchService.MAX_OPEN_SCROLL_CONTEXT,

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
274274
Property.NodeScope
275275
);
276276

277+
public static final Setting<Boolean> BATCHED_QUERY_PHASE = Setting.boolSetting(
278+
"search.batched_query_phase",
279+
true,
280+
Property.Dynamic,
281+
Property.NodeScope
282+
);
283+
277284
public static final int DEFAULT_SIZE = 10;
278285
public static final int DEFAULT_FROM = 0;
279286
private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0];
@@ -300,6 +307,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
300307

301308
private volatile TimeValue defaultSearchTimeout;
302309

310+
private volatile boolean batchQueryPhase;
311+
303312
private final int minimumDocsPerSlice;
304313

305314
private volatile boolean defaultAllowPartialSearchResults;
@@ -390,6 +399,9 @@ public SearchService(
390399
enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
391400
clusterService.getClusterSettings()
392401
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);
402+
batchQueryPhase = BATCHED_QUERY_PHASE.get(settings);
403+
clusterService.getClusterSettings()
404+
.addSettingsUpdateConsumer(BATCHED_QUERY_PHASE, bulkExecuteQueryPhase -> this.batchQueryPhase = bulkExecuteQueryPhase);
393405
}
394406

395407
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
@@ -452,6 +464,10 @@ private void setEnableRewriteAggsToFilterByFilter(boolean enableRewriteAggsToFil
452464
this.enableRewriteAggsToFilterByFilter = enableRewriteAggsToFilterByFilter;
453465
}
454466

467+
public boolean batchQueryPhase() {
468+
return batchQueryPhase;
469+
}
470+
455471
@Override
456472
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
457473
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.search.SortField;
1414
import org.apache.lucene.search.TopFieldDocs;
1515
import org.apache.lucene.search.TotalHits;
16-
import org.apache.lucene.tests.util.LuceneTestCase;
1716
import org.elasticsearch.action.ActionListener;
1817
import org.elasticsearch.action.OriginalIndices;
1918
import org.elasticsearch.cluster.ClusterName;
@@ -56,7 +55,6 @@
5655
import static org.mockito.Mockito.mock;
5756
import static org.mockito.Mockito.when;
5857

59-
@LuceneTestCase.AwaitsFix(bugUrl = "makes limited sense with batched execution")
6058
public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
6159
public void testBottomFieldSort() throws Exception {
6260
testCase(false, false);
@@ -208,7 +206,8 @@ public void sendExecuteQuery(
208206
new ClusterState.Builder(new ClusterName("test")).build(),
209207
task,
210208
SearchResponse.Clusters.EMPTY,
211-
null
209+
null,
210+
false
212211
) {
213212
@Override
214213
protected SearchPhase getNextPhase() {

0 commit comments

Comments
 (0)