Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.AbstractSearchCancellationTestCase;
Expand All @@ -42,6 +41,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
Expand Down Expand Up @@ -240,8 +240,6 @@ public void testCancelMultiSearch() throws Exception {
}

public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
// are limited to 1
internalCluster().ensureAtLeastNumDataNodes(2);
Expand All @@ -262,25 +260,27 @@ public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception
assertThat(e.getMessage(), containsString("Partial shards failure"));
});

// When the search request executes, block all shards except 1.
// When the search request executes, allow shards from one node to proceed and block shards from other nodes.
final List<SearchShardBlockingPlugin> searchShardBlockingPlugins = initSearchShardBlockingPlugin();
AtomicBoolean letOneShardProceed = new AtomicBoolean();
// Ensure we have at least one task waiting on the latch
final AtomicReference<String> selectedNodeId = new AtomicReference<>();
CountDownLatch waitingTaskLatch = new CountDownLatch(1);
CountDownLatch shardTaskLatch = new CountDownLatch(1);
for (SearchShardBlockingPlugin plugin : searchShardBlockingPlugins) {
plugin.setRunOnNewReaderContext((ReaderContext c) -> {
if (letOneShardProceed.compareAndSet(false, true)) {
// Let one shard continue.
plugin.setRunOnPreQueryPhase((SearchContext c) -> {
String nodeId = c.shardTarget().getNodeId();
if (selectedNodeId.compareAndSet(null, nodeId) || nodeId.equals(selectedNodeId.get())) {
// Let shards on the selected node continue
logger.info("Allowing shard [{}] on node [{}] to proceed", c.shardTarget().getShardId(), nodeId);
} else {
// Signal that we have a task waiting on the latch
logger.info("Blocking shard [{}] on node [{}]", c.shardTarget().getShardId(), nodeId);
waitingTaskLatch.countDown();
safeAwait(shardTaskLatch); // Block the other shards.
safeAwait(shardTaskLatch); // Block shards on other nodes
}
});
}

// For the shard that was allowed to proceed, have a single query-execution thread throw an exception.
// For the shards that were allowed to proceed, have a single query-execution thread throw an exception.
final List<ScriptedBlockPlugin> plugins = initBlockFactory();
AtomicBoolean oneThreadWillError = new AtomicBoolean();
for (ScriptedBlockPlugin plugin : plugins) {
Expand Down Expand Up @@ -313,7 +313,7 @@ public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception
shardTaskLatch.countDown(); // unblock the shardTasks, allowing the test to conclude.
searchThread.join();
plugins.forEach(plugin -> plugin.setBeforeExecution(() -> {}));
searchShardBlockingPlugins.forEach(plugin -> plugin.setRunOnNewReaderContext((ReaderContext c) -> {}));
searchShardBlockingPlugins.forEach(plugin -> plugin.setRunOnPreQueryPhase((SearchContext c) -> {}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ private void setFailure(QueryPerNodeState state, int dataNodeLocalIdx, Exception

@Override
public void onFailure(Exception e) {
// TODO: count down fully and just respond with an exception if partial results aren't allowed as an
// optimization
// Note: this shard won't be retried until it returns to the coordinating node where the shard iterator lives
// TODO: consider alternatives that don't wait for the entire batch to complete before retrying the shard
setFailure(state, dataNodeLocalIdx, e);
doneFuture.onResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
import org.elasticsearch.tasks.TaskInfo;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -279,20 +279,20 @@ protected List<SearchShardBlockingPlugin> initSearchShardBlockingPlugin() {
}

public static class SearchShardBlockingPlugin extends Plugin {
private final AtomicReference<Consumer<ReaderContext>> runOnNewReaderContext = new AtomicReference<>();
private final AtomicReference<Consumer<SearchContext>> runOnPreQueryPhase = new AtomicReference<>();

public void setRunOnNewReaderContext(Consumer<ReaderContext> consumer) {
runOnNewReaderContext.set(consumer);
public void setRunOnPreQueryPhase(Consumer<SearchContext> consumer) {
runOnPreQueryPhase.set(consumer);
}

@Override
public void onIndexModule(IndexModule indexModule) {
super.onIndexModule(indexModule);
indexModule.addSearchOperationListener(new SearchOperationListener() {
@Override
public void onNewReaderContext(ReaderContext c) {
if (runOnNewReaderContext.get() != null) {
runOnNewReaderContext.get().accept(c);
public void onPreQueryPhase(SearchContext c) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change because SearchContext gave me easier access to the node ID.

Both the onNewReaderContext and onPreQueryPhase hooks run before query execution begins, so either will do the job for this test (the only user of SearchShardBlockingPlugin).

if (runOnPreQueryPhase.get() != null) {
runOnPreQueryPhase.get().accept(c);
}
}
});
Expand Down