Skip to content

Commit e2e761b

Browse files
committed
Add test skeleton
1 parent ff822ef commit e2e761b

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.transport.RemoteClusterAware;
3333
import org.elasticsearch.xcontent.XContentBuilder;
3434
import org.elasticsearch.xcontent.json.JsonXContent;
35+
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
3536
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3637
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3738
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
@@ -357,6 +358,63 @@ public void testAsyncQueriesWithLimit0() throws IOException {
357358
}
358359
}
359360

361+
public void testStopQuery() throws Exception {
362+
Map<String, Object> testClusterInfo = setupClusters(3);
363+
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
364+
int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards");
365+
int remote2NumShards = (Integer) testClusterInfo.get("remote2.blocking_index.num_shards");
366+
367+
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
368+
Boolean requestIncludeMeta = includeCCSMetadata.v1();
369+
boolean responseExpectMeta = includeCCSMetadata.v2();
370+
371+
AtomicReference<String> asyncExecutionId = new AtomicReference<>();
372+
373+
String q = "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(const) | LIMIT 10";
374+
try (EsqlQueryResponse resp = runAsyncQuery(q, requestIncludeMeta, null, TimeValue.timeValueMillis(100))) {
375+
assertTrue(resp.isRunning());
376+
assertNotNull("async execution id is null", resp.asyncExecutionId());
377+
asyncExecutionId.set(resp.asyncExecutionId().get());
378+
// executionInfo may or may not be set on the initial response when there is a relatively low wait_for_completion_timeout
379+
// so we do not check for it here
380+
}
381+
382+
// wait until we know that the query against 'remote-b:blocking' has started
383+
PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
384+
385+
// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
386+
assertBusy(() -> {
387+
try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
388+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
389+
assertNotNull(executionInfo);
390+
EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster("cluster-a");
391+
assertThat(clusterA.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
392+
}
393+
});
394+
395+
/* at this point:
396+
* the query against cluster-a should be finished
397+
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
398+
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
399+
*/
400+
401+
// run the stop query
402+
var stopRequest = new AsyncStopRequest(asyncExecutionId.get());
403+
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
404+
// allow remoteB query to proceed
405+
PauseFieldPlugin.allowEmitting.countDown();
406+
407+
// Since part of the query has not been stopped, we expect some result to emerge here
408+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(1, TimeUnit.SECONDS)) {
409+
assertThat(asyncResponse.isRunning(), is(false));
410+
assertThat(asyncResponse.columns().size(), equalTo(1));
411+
assertThat(asyncResponse.values().hasNext(), is(true));
412+
} finally {
413+
AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId.get());
414+
assertThat(acknowledgedResponse.isAcknowledged(), is(true));
415+
}
416+
}
417+
360418
protected EsqlQueryResponse runAsyncQuery(String query, Boolean ccsMetadata, QueryBuilder filter, TimeValue waitCompletionTime) {
361419
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
362420
request.query(query);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
7070
private final LookupFromIndexService lookupFromIndexService;
7171
private final AsyncTaskManagementService<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> asyncTaskManagementService;
7272
private final RemoteClusterService remoteClusterService;
73-
73+
// Listeners for active async queries
7474
private final Map<String, SubscribableListener<EsqlQueryResponse>> asyncListeners = ConcurrentCollections.newConcurrentMap();
7575

7676
@Inject

0 commit comments

Comments
 (0)