From e9ed8bba36e630435de18ede43fd24b10860557e Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 16 Apr 2025 14:21:59 +0200 Subject: [PATCH] Cancel expired async search task when a remote returns its results (#126583) A while ago we enabled using ccs_minimize_roundtrips in async search. This makes it possible for users of async search to send a single search request per remote cluster, and minimize the impact of network latency. With non minimized roundtrips, we have pretty recurring cancellation checks: as part of the execution, we detect that a task expired whenever each shard comes back with its results. In a scenario where the coord node does not hold data, or only remote data is targeted by an async search, we have much less chance of detecting cancellation if roundtrips are minimized. The local coordinator would do nothing other than waiting for the minimized results from each remote cluster. One scenario where we can check for cancellation is when each cluster comes back with its full set of results. This commit adds such check, plus some testing for async search cancellation with minimized roundtrips. --- docs/changelog/126583.yaml | 5 + .../CCSUsageTelemetryAsyncSearchIT.java | 15 +- .../search/CrossClusterAsyncSearchIT.java | 332 +++++++++++++++--- .../xpack/search/AsyncSearchTask.java | 1 + 4 files changed, 291 insertions(+), 62 deletions(-) create mode 100644 docs/changelog/126583.yaml diff --git a/docs/changelog/126583.yaml b/docs/changelog/126583.yaml new file mode 100644 index 0000000000000..a6732b7936f8a --- /dev/null +++ b/docs/changelog/126583.yaml @@ -0,0 +1,5 @@ +pr: 126583 +summary: Cancel expired async search task when a remote returns its results +area: CCS +type: bug +issues: [] diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java index 1b19f6f04693b..19eaaf63c552a 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java @@ -87,7 +87,7 @@ public void resetSearchListenerPlugin() { } private SubmitAsyncSearchRequest makeSearchRequest(String... indices) { - CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indices); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -220,7 +220,8 @@ public void testCancelledSearch() throws Exception { String remoteIndex = (String) testClusterInfo.get("remote.index"); SubmitAsyncSearchRequest searchRequest = makeSearchRequest(localIndex, REMOTE1 + ":" + remoteIndex); - CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockRemoteQueryPhase(); String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); final AsyncSearchResponse response = cluster(LOCAL_CLUSTER).client(nodeName) @@ -232,7 +233,8 @@ public void testCancelledSearch() throws Exception { response.decRef(); assertTrue(response.isRunning()); } - CrossClusterAsyncSearchIT.SearchListenerPlugin.waitSearchStarted(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.waitLocalSearchStarted(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.waitRemoteSearchStarted(); ActionFuture cancelFuture; try { @@ -290,7 +292,8 @@ public void testCancelledSearch() throws Exception { assertTrue(taskInfo.description(), taskInfo.cancelled()); } } finally { - CrossClusterAsyncSearchIT.SearchListenerPlugin.allowQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.allowLocalQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.allowRemoteQueryPhase(); } assertBusy(() -> assertTrue(cancelFuture.isDone())); @@ -314,7 +317,7 @@ private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) { } private Map setupClusters() { - String localIndex = "demo"; + String localIndex = "local"; int numShardsLocal = randomIntBetween(2, 10); Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); assertAcked( @@ -326,7 +329,7 @@ private Map setupClusters() { ); indexDocs(client(LOCAL_CLUSTER), localIndex); - String remoteIndex = "prod"; + String remoteIndex = "remote"; int numShardsRemote = randomIntBetween(2, 10); for (String clusterAlias : remoteClusterAlias()) { final InternalTestCluster remoteCluster = cluster(clusterAlias); diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java index 2a8daf8bfe12c..d951b21ba1380 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java @@ -11,6 +11,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -67,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -141,7 +142,7 @@ public void testClusterDetailsAfterSuccessfulCCS() throws Exception { int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -177,8 +178,8 @@ public void testClusterDetailsAfterSuccessfulCCS() throws Exception { response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); final AsyncSearchResponse finishedResponse = getAsyncSearch(responseId); @@ -262,7 +263,7 @@ public void testCCSClusterDetailsWhereAllShardsSkippedInCanMatch() throws Except int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -311,9 +312,9 @@ public void testCCSClusterDetailsWhereAllShardsSkippedInCanMatch() throws Except response.decRef(); } if (dfs) { - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); } - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); final AsyncSearchResponse finishedResponse = getAsyncSearch(responseId); @@ -514,7 +515,7 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Except int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -550,8 +551,8 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Except response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -652,7 +653,7 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exce int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -695,8 +696,8 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exce response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -815,7 +816,7 @@ public void testClusterDetailsAfterCCSWhereRemoteClusterHasNoShardsToSearch() th String localIndex = (String) testClusterInfo.get("local.index"); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); // query against a missing index on the remote cluster SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + "no_such_index*"); @@ -856,8 +857,8 @@ public void testClusterDetailsAfterCCSWhereRemoteClusterHasNoShardsToSearch() th response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -1325,12 +1326,198 @@ public void testRemoteClusterOnlyCCSWithFailuresOnAllShards() throws Exception { } } + /** + * This test verifies that get async search triggers an automatic task cancellation when trying to retrieve + * results for an expired async search + */ + public void testCancelViaExpirationOnGetAsyncSearchWithMinimizeRoundtrips() throws Exception { + Map testClusterInfo = setupTwoClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remoteIndex = (String) testClusterInfo.get("remote.index"); + + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); + + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); + request.setCcsMinimizeRoundtrips(true); + request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + request.setKeepAlive(new TimeValue(1, TimeUnit.SECONDS)); + request.setKeepOnCompletion(true); + + final AsyncSearchResponse response = submitAsyncSearch(request); + try { + assertNotNull(response.getSearchResponse()); + } finally { + response.decRef(); + assertTrue(response.isRunning()); + } + + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); + + ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get(); + List localSearchTasks = listTasksResponse.getTasks(); + assertThat(localSearchTasks.size(), equalTo(1)); + TaskInfo localSearchTask = localSearchTasks.get(0); + assertFalse("taskInfo on local cluster should not be cancelled yet: " + localSearchTask, localSearchTask.cancelled()); + + AtomicReference remoteClusterSearchTask = new AtomicReference<>(); + assertBusy(() -> { + List remoteSearchTasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get() + .getTasks(); + assertThat(remoteSearchTasks.size(), equalTo(1)); + remoteClusterSearchTask.set(remoteSearchTasks.getFirst()); + }); + assertFalse( + "taskInfo on remote cluster should not be cancelled yet: " + remoteClusterSearchTask.get(), + remoteClusterSearchTask.get().cancelled() + ); + + // wait until the async search has expired (takes one second - keep alive can't be set lower than 1s) + // the get async search that returns 404 will also cancel the task as it is expired + assertBusy(() -> { + expectThrows(ResourceNotFoundException.class, () -> { + AsyncSearchResponse asyncSearchResponse = getAsyncSearch(response.getId()); + asyncSearchResponse.decRef(); + }); + }); + + try { + assertBusy(() -> { + // check that the tasks are cancelled + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertTrue(getLocalTaskResponse.getTask().getTask().cancelled()); + GetTaskResponse getRemoteTaskResponse = client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .get(); + assertTrue(getRemoteTaskResponse.getTask().getTask().cancelled()); + }); + } finally { + SearchListenerPlugin.allowRemoteQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + waitForSearchTasksToFinish(); + } + } + + public void testCancelViaExpirationOnRemoteResultsWithMinimizeRoundtrips() throws Exception { + Map testClusterInfo = setupTwoClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remoteIndex = (String) testClusterInfo.get("remote.index"); + + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); + + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); + request.setCcsMinimizeRoundtrips(true); + request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + request.setKeepAlive(new TimeValue(1, TimeUnit.SECONDS)); + request.setKeepOnCompletion(true); + + final AsyncSearchResponse response = submitAsyncSearch(request); + try { + assertNotNull(response.getSearchResponse()); + } finally { + response.decRef(); + assertTrue(response.isRunning()); + } + + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); + + ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get(); + List localSearchTasks = listTasksResponse.getTasks(); + assertThat(localSearchTasks.size(), equalTo(1)); + TaskInfo localSearchTask = localSearchTasks.get(0); + assertFalse("taskInfo on local cluster should not be cancelled yet: " + localSearchTask, localSearchTask.cancelled()); + + AtomicReference remoteClusterSearchTask = new AtomicReference<>(); + assertBusy(() -> { + List remoteSearchTasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get() + .getTasks(); + assertThat(remoteSearchTasks.size(), equalTo(1)); + remoteClusterSearchTask.set(remoteSearchTasks.getFirst()); + }); + assertFalse( + "taskInfo on remote cluster should not be cancelled yet: " + remoteClusterSearchTask.get(), + remoteClusterSearchTask.get().cancelled() + ); + + AsyncSearchResponse asyncSearchResponse = getAsyncSearch(response.getId()); + asyncSearchResponse.decRef(); + + // wait until the async search has expired (takes one second - keep alive can't be set lower than 1s) + // don't call get async search as that triggers cancellation of the task - we want to verify that we can cancel it + // as we get results from a remote cluster + assertBusy(() -> assertThat(System.currentTimeMillis(), greaterThanOrEqualTo(asyncSearchResponse.getExpirationTime()))); + + { + // check that the tasks are cancelled + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertFalse(getLocalTaskResponse.getTask().getTask().cancelled()); + GetTaskResponse getRemoteTaskResponse = client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .get(); + assertFalse(getRemoteTaskResponse.getTask().getTask().cancelled()); + } + + // unblock the remote query phase, but not the local one: we want to test that getting results from a remote cluster triggers + // cancellation given the async search has expired + SearchListenerPlugin.allowRemoteQueryPhase(); + + try { + assertBusy(() -> { + // check that the tasks are cancelled - they get cancelled because we check for cancellation in + // AsyncSearchTask#onClusterResponseMinimizeRoundtrips + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertTrue(getLocalTaskResponse.getTask().getTask().cancelled()); + expectThrows( + ResourceNotFoundException.class, + () -> client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .actionGet() + ); + }); + } finally { + SearchListenerPlugin.allowLocalQueryPhase(); + waitForSearchTasksToFinish(); + } + } + public void testCancelViaTasksAPI() throws Exception { Map testClusterInfo = setupTwoClusters(); String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -1350,7 +1537,8 @@ public void testCancelViaTasksAPI() throws Exception { assertTrue(response.isRunning()); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); ActionFuture cancelFuture; try { @@ -1426,7 +1614,8 @@ public void testCancelViaTasksAPI() throws Exception { } } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + SearchListenerPlugin.allowRemoteQueryPhase(); } assertBusy(() -> assertTrue(cancelFuture.isDone())); @@ -1462,7 +1651,8 @@ public void testCancelViaAsyncSearchDelete() throws Exception { String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -1482,7 +1672,8 @@ public void testCancelViaAsyncSearchDelete() throws Exception { response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); try { ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() @@ -1538,11 +1729,7 @@ public void testCancelViaAsyncSearchDelete() throws Exception { assertTrue(taskInfo.description(), taskInfo.cancelled()); } - ExecutionException e = expectThrows(ExecutionException.class, () -> getAsyncSearch(response.getId())); - assertNotNull(e); - assertNotNull(e.getCause()); - Throwable t = ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); - assertNotNull("after deletion, getAsyncSearch should throw an Exception with ResourceNotFoundException in the causal chain", t); + expectThrows(ResourceNotFoundException.class, () -> getAsyncSearch(response.getId())); AsyncStatusResponse statusResponse = getAsyncStatus(response.getId()); assertTrue(statusResponse.isPartial()); @@ -1550,12 +1737,13 @@ public void testCancelViaAsyncSearchDelete() throws Exception { assertThat(statusResponse.getClusters().getTotal(), equalTo(2)); assertNull(statusResponse.getCompletionStatus()); } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + SearchListenerPlugin.allowRemoteQueryPhase(); } waitForSearchTasksToFinish(); - assertBusy(() -> expectThrows(ExecutionException.class, () -> getAsyncStatus(response.getId()))); + assertBusy(() -> expectThrows(ResourceNotFoundException.class, () -> getAsyncStatus(response.getId()))); } public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws Exception { @@ -1563,7 +1751,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); TimeValue searchTimeout = new TimeValue(100, TimeUnit.MILLISECONDS); // query builder that will sleep for the specified amount of time in the query phase @@ -1589,7 +1777,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); // ensure tasks are present on both clusters and not cancelled try { @@ -1620,7 +1808,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws } } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); } // query phase has begun, so wait for query failure (due to timeout) @@ -1706,24 +1894,24 @@ private static void assertAllShardsFailed(boolean minimizeRoundtrips, SearchResp ); } - protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(SubmitAsyncSearchAction.INSTANCE, request).get(); + protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) { + return client(LOCAL_CLUSTER).execute(SubmitAsyncSearchAction.INSTANCE, request).actionGet(); } - protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).get(); + protected AsyncSearchResponse getAsyncSearch(String id) { + return client(LOCAL_CLUSTER).execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).actionGet(); } - protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get(); + protected AsyncStatusResponse getAsyncStatus(String id) { + return client(LOCAL_CLUSTER).execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).actionGet(); } - protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client().execute(TransportDeleteAsyncResultAction.TYPE, new DeleteAsyncResultRequest(id)).get(); + protected AcknowledgedResponse deleteAsyncSearch(String id) { + return client().execute(TransportDeleteAsyncResultAction.TYPE, new DeleteAsyncResultRequest(id)).actionGet(); } private Map setupTwoClusters() { - String localIndex = "demo"; + String localIndex = "local"; int numShardsLocal = randomIntBetween(2, 12); Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); assertAcked( @@ -1735,7 +1923,7 @@ private Map setupTwoClusters() { ); indexDocs(client(LOCAL_CLUSTER), localIndex); - String remoteIndex = "prod"; + String remoteIndex = "remote"; int numShardsRemote = randomIntBetween(2, 12); final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER); remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3)); @@ -1795,8 +1983,10 @@ public void resetSearchListenerPlugin() throws Exception { } public static class SearchListenerPlugin extends Plugin { - private static final AtomicReference startedLatch = new AtomicReference<>(); - private static final AtomicReference queryLatch = new AtomicReference<>(); + private static final AtomicReference startedLocalLatch = new AtomicReference<>(); + private static final AtomicReference startedRemoteLatch = new AtomicReference<>(); + private static final AtomicReference localQueryLatch = new AtomicReference<>(); + private static final AtomicReference remoteQueryLatch = new AtomicReference<>(); private static final AtomicReference failedQueryLatch = new AtomicReference<>(); /** @@ -1804,11 +1994,17 @@ public static class SearchListenerPlugin extends Plugin { * avoid test problems around searches of the .async-search index */ static void negate() { - if (startedLatch.get() != null) { - startedLatch.get().countDown(); + if (startedLocalLatch.get() != null) { + startedLocalLatch.get().countDown(); } - if (queryLatch.get() != null) { - queryLatch.get().countDown(); + if (startedRemoteLatch.get() != null) { + startedRemoteLatch.get().countDown(); + } + if (localQueryLatch.get() != null) { + localQueryLatch.get().countDown(); + } + if (remoteQueryLatch.get() != null) { + remoteQueryLatch.get().countDown(); } if (failedQueryLatch.get() != null) { failedQueryLatch.get().countDown(); @@ -1816,23 +2012,39 @@ static void negate() { } static void reset() { - startedLatch.set(new CountDownLatch(1)); + startedLocalLatch.set(new CountDownLatch(1)); + startedRemoteLatch.set(new CountDownLatch(1)); failedQueryLatch.set(new CountDownLatch(1)); } - static void blockQueryPhase() { - queryLatch.set(new CountDownLatch(1)); + static void blockRemoteQueryPhase() { + remoteQueryLatch.set(new CountDownLatch(1)); + } + + static void allowRemoteQueryPhase() { + final CountDownLatch latch = remoteQueryLatch.get(); + if (latch != null) { + latch.countDown(); + } + } + + static void blockLocalQueryPhase() { + localQueryLatch.set(new CountDownLatch(1)); } - static void allowQueryPhase() { - final CountDownLatch latch = queryLatch.get(); + static void allowLocalQueryPhase() { + final CountDownLatch latch = localQueryLatch.get(); if (latch != null) { latch.countDown(); } } - static void waitSearchStarted() throws InterruptedException { - assertTrue(startedLatch.get().await(60, TimeUnit.SECONDS)); + static void waitRemoteSearchStarted() throws InterruptedException { + assertTrue(startedRemoteLatch.get().await(60, TimeUnit.SECONDS)); + } + + static void waitLocalSearchStarted() throws InterruptedException { + assertTrue(startedLocalLatch.get().await(60, TimeUnit.SECONDS)); } static void waitQueryFailure() throws Exception { @@ -1849,8 +2061,16 @@ public void onNewReaderContext(ReaderContext readerContext) { @Override public void onPreQueryPhase(SearchContext searchContext) { - startedLatch.get().countDown(); - final CountDownLatch latch = queryLatch.get(); + final CountDownLatch latch; + if (searchContext.indexShard().shardId().getIndexName().equals("remote")) { + startedRemoteLatch.get().countDown(); + latch = remoteQueryLatch.get(); + } else if (searchContext.indexShard().shardId().getIndexName().equals("local")) { + startedLocalLatch.get().countDown(); + latch = localQueryLatch.get(); + } else { + throw new AssertionError("unexpected index name: " + searchContext.indexShard().shardId().getIndexName()); + } if (latch != null) { try { assertTrue(latch.await(60, TimeUnit.SECONDS)); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 484683fc6ffdd..d14b60f7b77f8 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -519,6 +519,7 @@ public void onFinalReduce(List shards, TotalHits totalHits, Interna */ @Override public void onClusterResponseMinimizeRoundtrips(String clusterAlias, SearchResponse clusterResponse) { + checkCancellation(); // no need to call the delegate progress listener, since this method is only called for minimize_roundtrips=true searchResponse.updateResponseMinimizeRoundtrips(clusterAlias, clusterResponse); }