Skip to content

Commit 1023131

Browse files
authored
Rework cancellation test for batched query execution (elastic#133579)
1 parent 3a7f712 commit 1023131

File tree

3 files changed

+103
-79
lines changed

3 files changed

+103
-79
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java

Lines changed: 94 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.elasticsearch.script.ScriptType;
3030
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
3131
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
32-
import org.elasticsearch.search.internal.ReaderContext;
32+
import org.elasticsearch.search.internal.SearchContext;
3333
import org.elasticsearch.tasks.Task;
3434
import org.elasticsearch.tasks.TaskCancelledException;
3535
import org.elasticsearch.test.AbstractSearchCancellationTestCase;
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.CountDownLatch;
4343
import java.util.concurrent.TimeUnit;
4444
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.concurrent.atomic.AtomicReference;
4546

4647
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
4748
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
@@ -240,80 +241,103 @@ public void testCancelMultiSearch() throws Exception {
240241
}
241242

242243
public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
243-
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
244-
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
245-
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
246-
// are limited to 1
247-
internalCluster().ensureAtLeastNumDataNodes(2);
248-
int numberOfShards = between(2, 5);
249-
createIndex("test", numberOfShards, 0);
250-
indexTestData();
251-
252-
// Define (but don't run) the search request, expecting a partial shard failure. We will run it later.
253-
Thread searchThread = new Thread(() -> {
254-
logger.info("Executing search");
255-
SearchPhaseExecutionException e = expectThrows(
256-
SearchPhaseExecutionException.class,
257-
prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH)
258-
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap())))
259-
.setAllowPartialSearchResults(false)
260-
.setSize(1000)
261-
);
262-
assertThat(e.getMessage(), containsString("Partial shards failure"));
263-
});
264-
265-
// When the search request executes, block all shards except 1.
266-
final List<SearchShardBlockingPlugin> searchShardBlockingPlugins = initSearchShardBlockingPlugin();
267-
AtomicBoolean letOneShardProceed = new AtomicBoolean();
268-
// Ensure we have at least one task waiting on the latch
269-
CountDownLatch waitingTaskLatch = new CountDownLatch(1);
270-
CountDownLatch shardTaskLatch = new CountDownLatch(1);
271-
for (SearchShardBlockingPlugin plugin : searchShardBlockingPlugins) {
272-
plugin.setRunOnNewReaderContext((ReaderContext c) -> {
273-
if (letOneShardProceed.compareAndSet(false, true)) {
274-
// Let one shard continue.
275-
} else {
276-
// Signal that we have a task waiting on the latch
277-
waitingTaskLatch.countDown();
278-
safeAwait(shardTaskLatch); // Block the other shards.
279-
}
244+
boolean useBatched = randomBoolean();
245+
try {
246+
if (useBatched == false) { // It's true by default
247+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
248+
}
249+
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
250+
// are limited to 1
251+
internalCluster().ensureAtLeastNumDataNodes(2);
252+
int numberOfShards = between(2, 5);
253+
createIndex("test", numberOfShards, 0);
254+
indexTestData();
255+
256+
// Define (but don't run) the search request, expecting a partial shard failure. We will run it later.
257+
Thread searchThread = new Thread(() -> {
258+
logger.info("Executing search");
259+
SearchPhaseExecutionException e = expectThrows(
260+
SearchPhaseExecutionException.class,
261+
prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH)
262+
.setQuery(
263+
scriptQuery(new Script(ScriptType.INLINE, "mockscript", SEARCH_BLOCK_SCRIPT_NAME, Collections.emptyMap()))
264+
)
265+
.setAllowPartialSearchResults(false)
266+
.setSize(1000)
267+
);
268+
assertThat(e.getMessage(), containsString("Partial shards failure"));
280269
});
281-
}
282270

283-
// For the shard that was allowed to proceed, have a single query-execution thread throw an exception.
284-
final List<ScriptedBlockPlugin> plugins = initBlockFactory();
285-
AtomicBoolean oneThreadWillError = new AtomicBoolean();
286-
for (ScriptedBlockPlugin plugin : plugins) {
287-
plugin.disableBlock();
288-
plugin.setBeforeExecution(() -> {
289-
if (oneThreadWillError.compareAndSet(false, true)) {
290-
// wait for some task to get to the latch
291-
safeAwait(waitingTaskLatch);
292-
// then throw the exception
293-
throw new IllegalStateException("This will cancel the ContextIndexSearcher.search task");
294-
}
295-
});
296-
}
271+
// When the search request executes, allow some shards to proceed and block others
272+
final List<SearchShardBlockingPlugin> searchShardBlockingPlugins = initSearchShardBlockingPlugin();
273+
CountDownLatch waitingTaskLatch = new CountDownLatch(1);
274+
CountDownLatch shardTaskLatch = new CountDownLatch(1);
275+
final AtomicReference<String> selectedNodeId = new AtomicReference<>();
276+
final AtomicBoolean letOneShardProceed = new AtomicBoolean();
277+
for (SearchShardBlockingPlugin plugin : searchShardBlockingPlugins) {
278+
plugin.setRunOnPreQueryPhase((SearchContext c) -> {
279+
if (useBatched) { // Allow all the shards on one node to continue. Block all others.
280+
String nodeId = c.shardTarget().getNodeId();
281+
if (selectedNodeId.compareAndSet(null, nodeId) || nodeId.equals(selectedNodeId.get())) {
282+
logger.info("Allowing shard [{}] on node [{}] to proceed", c.shardTarget().getShardId(), nodeId);
283+
} else {
284+
logger.info("Blocking shard [{}] on node [{}]", c.shardTarget().getShardId(), nodeId);
285+
// Signal that we have a task waiting on the latch
286+
waitingTaskLatch.countDown();
287+
safeAwait(shardTaskLatch); // Block shards on other nodes
288+
}
289+
} else { // Allow one shard to continue. Block all others.
290+
if (letOneShardProceed.compareAndSet(false, true)) {
291+
logger.info("Allowing shard [{}] to proceed", c.shardTarget().getShardId());
292+
} else {
293+
logger.info("Blocking shard [{}]", c.shardTarget().getShardId());
294+
// Signal that we have a task waiting on the latch
295+
waitingTaskLatch.countDown();
296+
safeAwait(shardTaskLatch); // Block all other shards
297+
}
298+
}
299+
});
300+
}
297301

298-
// Now run the search request.
299-
logger.info("Starting search thread");
300-
searchThread.start();
302+
// For the shards that were allowed to proceed, have a single query-execution thread throw an exception.
303+
final List<ScriptedBlockPlugin> plugins = initBlockFactory();
304+
AtomicBoolean oneThreadWillError = new AtomicBoolean();
305+
for (ScriptedBlockPlugin plugin : plugins) {
306+
plugin.disableBlock();
307+
plugin.setBeforeExecution(() -> {
308+
if (oneThreadWillError.compareAndSet(false, true)) {
309+
// wait for some task to get to the latch
310+
safeAwait(waitingTaskLatch);
311+
// then throw the exception
312+
throw new IllegalStateException("This will cancel the ContextIndexSearcher.search task");
313+
}
314+
});
315+
}
301316

302-
try {
303-
assertBusy(() -> {
304-
final List<SearchTask> coordinatorSearchTask = getCoordinatorSearchTasks();
305-
logger.info("Checking tasks: {}", coordinatorSearchTask);
306-
assertThat("The Coordinator should have one SearchTask.", coordinatorSearchTask, hasSize(1));
307-
assertTrue("The SearchTask should be cancelled.", coordinatorSearchTask.get(0).isCancelled());
308-
for (var shardQueryTask : getShardQueryTasks()) {
309-
assertTrue("All SearchShardTasks should then be cancelled", shardQueryTask.isCancelled());
310-
}
311-
}, 30, TimeUnit.SECONDS);
317+
// Now run the search request.
318+
logger.info("Starting search thread");
319+
searchThread.start();
320+
321+
try {
322+
assertBusy(() -> {
323+
final List<SearchTask> coordinatorSearchTask = getCoordinatorSearchTasks();
324+
logger.info("Checking tasks: {}", coordinatorSearchTask);
325+
assertThat("The Coordinator should have one SearchTask.", coordinatorSearchTask, hasSize(1));
326+
assertTrue("The SearchTask should be cancelled.", coordinatorSearchTask.get(0).isCancelled());
327+
for (var shardQueryTask : getShardQueryTasks()) {
328+
assertTrue("All SearchShardTasks should then be cancelled", shardQueryTask.isCancelled());
329+
}
330+
}, 30, TimeUnit.SECONDS);
331+
} finally {
332+
shardTaskLatch.countDown(); // unblock the shardTasks, allowing the test to conclude.
333+
searchThread.join();
334+
plugins.forEach(plugin -> plugin.setBeforeExecution(() -> {}));
335+
searchShardBlockingPlugins.forEach(plugin -> plugin.setRunOnPreQueryPhase((SearchContext c) -> {}));
336+
}
312337
} finally {
313-
shardTaskLatch.countDown(); // unblock the shardTasks, allowing the test to conclude.
314-
searchThread.join();
315-
plugins.forEach(plugin -> plugin.setBeforeExecution(() -> {}));
316-
searchShardBlockingPlugins.forEach(plugin -> plugin.setRunOnNewReaderContext((ReaderContext c) -> {}));
338+
if (useBatched == false) {
339+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
340+
}
317341
}
318342
}
319343

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,8 +720,8 @@ private void setFailure(QueryPerNodeState state, int dataNodeLocalIdx, Exception
720720

721721
@Override
722722
public void onFailure(Exception e) {
723-
// TODO: count down fully and just respond with an exception if partial results aren't allowed as an
724-
// optimization
723+
// Note: this shard won't be retried until it returns to the coordinating node where the shard iterator lives
724+
// TODO: consider alternatives that don't wait for the entire batch to complete before retrying the shard
725725
setFailure(state, dataNodeLocalIdx, e);
726726
doneFuture.onResponse(null);
727727
}

test/framework/src/main/java/org/elasticsearch/test/AbstractSearchCancellationTestCase.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.elasticsearch.rest.RestStatus;
2727
import org.elasticsearch.script.MockScriptPlugin;
2828
import org.elasticsearch.search.SearchService;
29-
import org.elasticsearch.search.internal.ReaderContext;
29+
import org.elasticsearch.search.internal.SearchContext;
3030
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
3131
import org.elasticsearch.tasks.TaskInfo;
3232
import org.junit.BeforeClass;
@@ -279,20 +279,20 @@ protected List<SearchShardBlockingPlugin> initSearchShardBlockingPlugin() {
279279
}
280280

281281
public static class SearchShardBlockingPlugin extends Plugin {
282-
private final AtomicReference<Consumer<ReaderContext>> runOnNewReaderContext = new AtomicReference<>();
282+
private final AtomicReference<Consumer<SearchContext>> runOnPreQueryPhase = new AtomicReference<>();
283283

284-
public void setRunOnNewReaderContext(Consumer<ReaderContext> consumer) {
285-
runOnNewReaderContext.set(consumer);
284+
public void setRunOnPreQueryPhase(Consumer<SearchContext> consumer) {
285+
runOnPreQueryPhase.set(consumer);
286286
}
287287

288288
@Override
289289
public void onIndexModule(IndexModule indexModule) {
290290
super.onIndexModule(indexModule);
291291
indexModule.addSearchOperationListener(new SearchOperationListener() {
292292
@Override
293-
public void onNewReaderContext(ReaderContext c) {
294-
if (runOnNewReaderContext.get() != null) {
295-
runOnNewReaderContext.get().accept(c);
293+
public void onPreQueryPhase(SearchContext c) {
294+
if (runOnPreQueryPhase.get() != null) {
295+
runOnPreQueryPhase.get().accept(c);
296296
}
297297
}
298298
});

0 commit comments

Comments
 (0)