diff --git a/docs/changelog/126583.yaml b/docs/changelog/126583.yaml new file mode 100644 index 0000000000000..a6732b7936f8a --- /dev/null +++ b/docs/changelog/126583.yaml @@ -0,0 +1,5 @@ +pr: 126583 +summary: Cancel expired async search task when a remote returns its results +area: CCS +type: bug +issues: [] diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java index 1b19f6f04693b..19eaaf63c552a 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CCSUsageTelemetryAsyncSearchIT.java @@ -87,7 +87,7 @@ public void resetSearchListenerPlugin() { } private SubmitAsyncSearchRequest makeSearchRequest(String... indices) { - CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indices); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -220,7 +220,8 @@ public void testCancelledSearch() throws Exception { String remoteIndex = (String) testClusterInfo.get("remote.index"); SubmitAsyncSearchRequest searchRequest = makeSearchRequest(localIndex, REMOTE1 + ":" + remoteIndex); - CrossClusterAsyncSearchIT.SearchListenerPlugin.blockQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockLocalQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.blockRemoteQueryPhase(); String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); final AsyncSearchResponse response = cluster(LOCAL_CLUSTER).client(nodeName) @@ -232,7 +233,8 @@ public void testCancelledSearch() throws Exception { response.decRef(); assertTrue(response.isRunning()); } - CrossClusterAsyncSearchIT.SearchListenerPlugin.waitSearchStarted(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.waitLocalSearchStarted(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.waitRemoteSearchStarted(); ActionFuture cancelFuture; try { @@ -290,7 +292,8 @@ public void testCancelledSearch() throws Exception { assertTrue(taskInfo.description(), taskInfo.cancelled()); } } finally { - CrossClusterAsyncSearchIT.SearchListenerPlugin.allowQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.allowLocalQueryPhase(); + CrossClusterAsyncSearchIT.SearchListenerPlugin.allowRemoteQueryPhase(); } assertBusy(() -> assertTrue(cancelFuture.isDone())); @@ -314,7 +317,7 @@ private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) { } private Map setupClusters() { - String localIndex = "demo"; + String localIndex = "local"; int numShardsLocal = randomIntBetween(2, 10); Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); assertAcked( @@ -326,7 +329,7 @@ private Map setupClusters() { ); indexDocs(client(LOCAL_CLUSTER), localIndex); - String remoteIndex = "prod"; + String remoteIndex = "remote"; int numShardsRemote = randomIntBetween(2, 10); for (String clusterAlias : remoteClusterAlias()) { final InternalTestCluster remoteCluster = cluster(clusterAlias); diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java index 2a8daf8bfe12c..d951b21ba1380 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java @@ -11,6 +11,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -67,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -141,7 +142,7 @@ public void testClusterDetailsAfterSuccessfulCCS() throws Exception { int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -177,8 +178,8 @@ public void testClusterDetailsAfterSuccessfulCCS() throws Exception { response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); final AsyncSearchResponse finishedResponse = getAsyncSearch(responseId); @@ -262,7 +263,7 @@ public void testCCSClusterDetailsWhereAllShardsSkippedInCanMatch() throws Except int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -311,9 +312,9 @@ public void testCCSClusterDetailsWhereAllShardsSkippedInCanMatch() throws Except response.decRef(); } if (dfs) { - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); } - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); final AsyncSearchResponse finishedResponse = getAsyncSearch(responseId); @@ -514,7 +515,7 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Except int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -550,8 +551,8 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Except response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -652,7 +653,7 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exce int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -695,8 +696,8 @@ public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exce response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -815,7 +816,7 @@ public void testClusterDetailsAfterCCSWhereRemoteClusterHasNoShardsToSearch() th String localIndex = (String) testClusterInfo.get("local.index"); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); // query against a missing index on the remote cluster SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + "no_such_index*"); @@ -856,8 +857,8 @@ public void testClusterDetailsAfterCCSWhereRemoteClusterHasNoShardsToSearch() th response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.allowLocalQueryPhase(); waitForSearchTasksToFinish(); @@ -1325,12 +1326,198 @@ public void testRemoteClusterOnlyCCSWithFailuresOnAllShards() throws Exception { } } + /** + * This test verifies that get async search triggers an automatic task cancellation when trying to retrieve + * results for an expired async search + */ + public void testCancelViaExpirationOnGetAsyncSearchWithMinimizeRoundtrips() throws Exception { + Map testClusterInfo = setupTwoClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remoteIndex = (String) testClusterInfo.get("remote.index"); + + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); + + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); + request.setCcsMinimizeRoundtrips(true); + request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + request.setKeepAlive(new TimeValue(1, TimeUnit.SECONDS)); + request.setKeepOnCompletion(true); + + final AsyncSearchResponse response = submitAsyncSearch(request); + try { + assertNotNull(response.getSearchResponse()); + } finally { + response.decRef(); + assertTrue(response.isRunning()); + } + + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); + + ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get(); + List localSearchTasks = listTasksResponse.getTasks(); + assertThat(localSearchTasks.size(), equalTo(1)); + TaskInfo localSearchTask = localSearchTasks.get(0); + assertFalse("taskInfo on local cluster should not be cancelled yet: " + localSearchTask, localSearchTask.cancelled()); + + AtomicReference remoteClusterSearchTask = new AtomicReference<>(); + assertBusy(() -> { + List remoteSearchTasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get() + .getTasks(); + assertThat(remoteSearchTasks.size(), equalTo(1)); + remoteClusterSearchTask.set(remoteSearchTasks.getFirst()); + }); + assertFalse( + "taskInfo on remote cluster should not be cancelled yet: " + remoteClusterSearchTask.get(), + remoteClusterSearchTask.get().cancelled() + ); + + // wait until the async search has expired (takes one second - keep alive can't be set lower than 1s) + // the get async search that returns 404 will also cancel the task as it is expired + assertBusy(() -> { + expectThrows(ResourceNotFoundException.class, () -> { + AsyncSearchResponse asyncSearchResponse = getAsyncSearch(response.getId()); + asyncSearchResponse.decRef(); + }); + }); + + try { + assertBusy(() -> { + // check that the tasks are cancelled + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertTrue(getLocalTaskResponse.getTask().getTask().cancelled()); + GetTaskResponse getRemoteTaskResponse = client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .get(); + assertTrue(getRemoteTaskResponse.getTask().getTask().cancelled()); + }); + } finally { + SearchListenerPlugin.allowRemoteQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + waitForSearchTasksToFinish(); + } + } + + public void testCancelViaExpirationOnRemoteResultsWithMinimizeRoundtrips() throws Exception { + Map testClusterInfo = setupTwoClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remoteIndex = (String) testClusterInfo.get("remote.index"); + + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); + + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); + request.setCcsMinimizeRoundtrips(true); + request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + request.setKeepAlive(new TimeValue(1, TimeUnit.SECONDS)); + request.setKeepOnCompletion(true); + + final AsyncSearchResponse response = submitAsyncSearch(request); + try { + assertNotNull(response.getSearchResponse()); + } finally { + response.decRef(); + assertTrue(response.isRunning()); + } + + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); + + ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get(); + List localSearchTasks = listTasksResponse.getTasks(); + assertThat(localSearchTasks.size(), equalTo(1)); + TaskInfo localSearchTask = localSearchTasks.get(0); + assertFalse("taskInfo on local cluster should not be cancelled yet: " + localSearchTask, localSearchTask.cancelled()); + + AtomicReference remoteClusterSearchTask = new AtomicReference<>(); + assertBusy(() -> { + List remoteSearchTasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(TransportSearchAction.TYPE.name()) + .get() + .getTasks(); + assertThat(remoteSearchTasks.size(), equalTo(1)); + remoteClusterSearchTask.set(remoteSearchTasks.getFirst()); + }); + assertFalse( + "taskInfo on remote cluster should not be cancelled yet: " + remoteClusterSearchTask.get(), + remoteClusterSearchTask.get().cancelled() + ); + + AsyncSearchResponse asyncSearchResponse = getAsyncSearch(response.getId()); + asyncSearchResponse.decRef(); + + // wait until the async search has expired (takes one second - keep alive can't be set lower than 1s) + // don't call get async search as that triggers cancellation of the task - we want to verify that we can cancel it + // as we get results from a remote cluster + assertBusy(() -> assertThat(System.currentTimeMillis(), greaterThanOrEqualTo(asyncSearchResponse.getExpirationTime()))); + + { + // check that the tasks are cancelled + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertFalse(getLocalTaskResponse.getTask().getTask().cancelled()); + GetTaskResponse getRemoteTaskResponse = client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .get(); + assertFalse(getRemoteTaskResponse.getTask().getTask().cancelled()); + } + + // unblock the remote query phase, but not the local one: we want to test that getting results from a remote cluster triggers + // cancellation given the async search has expired + SearchListenerPlugin.allowRemoteQueryPhase(); + + try { + assertBusy(() -> { + // check that the tasks are cancelled - they get cancelled because we check for cancellation in + // AsyncSearchTask#onClusterResponseMinimizeRoundtrips + GetTaskResponse getLocalTaskResponse = client(LOCAL_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(localSearchTask.taskId())) + .get(); + assertTrue(getLocalTaskResponse.getTask().getTask().cancelled()); + expectThrows( + ResourceNotFoundException.class, + () -> client(REMOTE_CLUSTER).admin() + .cluster() + .getTask(new GetTaskRequest().setTaskId(remoteClusterSearchTask.get().taskId())) + .actionGet() + ); + }); + } finally { + SearchListenerPlugin.allowLocalQueryPhase(); + waitForSearchTasksToFinish(); + } + } + public void testCancelViaTasksAPI() throws Exception { Map testClusterInfo = setupTwoClusters(); String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -1350,7 +1537,8 @@ public void testCancelViaTasksAPI() throws Exception { assertTrue(response.isRunning()); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); ActionFuture cancelFuture; try { @@ -1426,7 +1614,8 @@ public void testCancelViaTasksAPI() throws Exception { } } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + SearchListenerPlugin.allowRemoteQueryPhase(); } assertBusy(() -> assertTrue(cancelFuture.isDone())); @@ -1462,7 +1651,8 @@ public void testCancelViaAsyncSearchDelete() throws Exception { String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); + SearchListenerPlugin.blockRemoteQueryPhase(); SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex); request.setCcsMinimizeRoundtrips(randomBoolean()); @@ -1482,7 +1672,8 @@ public void testCancelViaAsyncSearchDelete() throws Exception { response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); + SearchListenerPlugin.waitRemoteSearchStarted(); try { ListTasksResponse listTasksResponse = client(LOCAL_CLUSTER).admin() @@ -1538,11 +1729,7 @@ public void testCancelViaAsyncSearchDelete() throws Exception { assertTrue(taskInfo.description(), taskInfo.cancelled()); } - ExecutionException e = expectThrows(ExecutionException.class, () -> getAsyncSearch(response.getId())); - assertNotNull(e); - assertNotNull(e.getCause()); - Throwable t = ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); - assertNotNull("after deletion, getAsyncSearch should throw an Exception with ResourceNotFoundException in the causal chain", t); + expectThrows(ResourceNotFoundException.class, () -> getAsyncSearch(response.getId())); AsyncStatusResponse statusResponse = getAsyncStatus(response.getId()); assertTrue(statusResponse.isPartial()); @@ -1550,12 +1737,13 @@ public void testCancelViaAsyncSearchDelete() throws Exception { assertThat(statusResponse.getClusters().getTotal(), equalTo(2)); assertNull(statusResponse.getCompletionStatus()); } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); + SearchListenerPlugin.allowRemoteQueryPhase(); } waitForSearchTasksToFinish(); - assertBusy(() -> expectThrows(ExecutionException.class, () -> getAsyncStatus(response.getId()))); + assertBusy(() -> expectThrows(ResourceNotFoundException.class, () -> getAsyncStatus(response.getId()))); } public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws Exception { @@ -1563,7 +1751,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws String localIndex = (String) testClusterInfo.get("local.index"); String remoteIndex = (String) testClusterInfo.get("remote.index"); - SearchListenerPlugin.blockQueryPhase(); + SearchListenerPlugin.blockLocalQueryPhase(); TimeValue searchTimeout = new TimeValue(100, TimeUnit.MILLISECONDS); // query builder that will sleep for the specified amount of time in the query phase @@ -1589,7 +1777,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws response.decRef(); } - SearchListenerPlugin.waitSearchStarted(); + SearchListenerPlugin.waitLocalSearchStarted(); // ensure tasks are present on both clusters and not cancelled try { @@ -1620,7 +1808,7 @@ public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws } } finally { - SearchListenerPlugin.allowQueryPhase(); + SearchListenerPlugin.allowLocalQueryPhase(); } // query phase has begun, so wait for query failure (due to timeout) @@ -1706,24 +1894,24 @@ private static void assertAllShardsFailed(boolean minimizeRoundtrips, SearchResp ); } - protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(SubmitAsyncSearchAction.INSTANCE, request).get(); + protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request) { + return client(LOCAL_CLUSTER).execute(SubmitAsyncSearchAction.INSTANCE, request).actionGet(); } - protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).get(); + protected AsyncSearchResponse getAsyncSearch(String id) { + return client(LOCAL_CLUSTER).execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).actionGet(); } - protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException { - return client(LOCAL_CLUSTER).execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get(); + protected AsyncStatusResponse getAsyncStatus(String id) { + return client(LOCAL_CLUSTER).execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).actionGet(); } - protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { - return client().execute(TransportDeleteAsyncResultAction.TYPE, new DeleteAsyncResultRequest(id)).get(); + protected AcknowledgedResponse deleteAsyncSearch(String id) { + return client().execute(TransportDeleteAsyncResultAction.TYPE, new DeleteAsyncResultRequest(id)).actionGet(); } private Map setupTwoClusters() { - String localIndex = "demo"; + String localIndex = "local"; int numShardsLocal = randomIntBetween(2, 12); Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); assertAcked( @@ -1735,7 +1923,7 @@ private Map setupTwoClusters() { ); indexDocs(client(LOCAL_CLUSTER), localIndex); - String remoteIndex = "prod"; + String remoteIndex = "remote"; int numShardsRemote = randomIntBetween(2, 12); final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER); remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3)); @@ -1795,8 +1983,10 @@ public void resetSearchListenerPlugin() throws Exception { } public static class SearchListenerPlugin extends Plugin { - private static final AtomicReference startedLatch = new AtomicReference<>(); - private static final AtomicReference queryLatch = new AtomicReference<>(); + private static final AtomicReference startedLocalLatch = new AtomicReference<>(); + private static final AtomicReference startedRemoteLatch = new AtomicReference<>(); + private static final AtomicReference localQueryLatch = new AtomicReference<>(); + private static final AtomicReference remoteQueryLatch = new AtomicReference<>(); private static final AtomicReference failedQueryLatch = new AtomicReference<>(); /** @@ -1804,11 +1994,17 @@ public static class SearchListenerPlugin extends Plugin { * avoid test problems around searches of the .async-search index */ static void negate() { - if (startedLatch.get() != null) { - startedLatch.get().countDown(); + if (startedLocalLatch.get() != null) { + startedLocalLatch.get().countDown(); } - if (queryLatch.get() != null) { - queryLatch.get().countDown(); + if (startedRemoteLatch.get() != null) { + startedRemoteLatch.get().countDown(); + } + if (localQueryLatch.get() != null) { + localQueryLatch.get().countDown(); + } + if (remoteQueryLatch.get() != null) { + remoteQueryLatch.get().countDown(); } if (failedQueryLatch.get() != null) { failedQueryLatch.get().countDown(); @@ -1816,23 +2012,39 @@ static void negate() { } static void reset() { - startedLatch.set(new CountDownLatch(1)); + startedLocalLatch.set(new CountDownLatch(1)); + startedRemoteLatch.set(new CountDownLatch(1)); failedQueryLatch.set(new CountDownLatch(1)); } - static void blockQueryPhase() { - queryLatch.set(new CountDownLatch(1)); + static void blockRemoteQueryPhase() { + remoteQueryLatch.set(new CountDownLatch(1)); + } + + static void allowRemoteQueryPhase() { + final CountDownLatch latch = remoteQueryLatch.get(); + if (latch != null) { + latch.countDown(); + } + } + + static void blockLocalQueryPhase() { + localQueryLatch.set(new CountDownLatch(1)); } - static void allowQueryPhase() { - final CountDownLatch latch = queryLatch.get(); + static void allowLocalQueryPhase() { + final CountDownLatch latch = localQueryLatch.get(); if (latch != null) { latch.countDown(); } } - static void waitSearchStarted() throws InterruptedException { - assertTrue(startedLatch.get().await(60, TimeUnit.SECONDS)); + static void waitRemoteSearchStarted() throws InterruptedException { + assertTrue(startedRemoteLatch.get().await(60, TimeUnit.SECONDS)); + } + + static void waitLocalSearchStarted() throws InterruptedException { + assertTrue(startedLocalLatch.get().await(60, TimeUnit.SECONDS)); } static void waitQueryFailure() throws Exception { @@ -1849,8 +2061,16 @@ public void onNewReaderContext(ReaderContext readerContext) { @Override public void onPreQueryPhase(SearchContext searchContext) { - startedLatch.get().countDown(); - final CountDownLatch latch = queryLatch.get(); + final CountDownLatch latch; + if (searchContext.indexShard().shardId().getIndexName().equals("remote")) { + startedRemoteLatch.get().countDown(); + latch = remoteQueryLatch.get(); + } else if (searchContext.indexShard().shardId().getIndexName().equals("local")) { + startedLocalLatch.get().countDown(); + latch = localQueryLatch.get(); + } else { + throw new AssertionError("unexpected index name: " + searchContext.indexShard().shardId().getIndexName()); + } if (latch != null) { try { assertTrue(latch.await(60, TimeUnit.SECONDS)); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 484683fc6ffdd..d14b60f7b77f8 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -519,6 +519,7 @@ public void onFinalReduce(List shards, TotalHits totalHits, Interna */ @Override public void onClusterResponseMinimizeRoundtrips(String clusterAlias, SearchResponse clusterResponse) { + checkCancellation(); // no need to call the delegate progress listener, since this method is only called for minimize_roundtrips=true searchResponse.updateResponseMinimizeRoundtrips(clusterAlias, clusterResponse); }