Skip to content

Commit 1fa422d

Browse files
authored
Backport fix for testStopQueryLocal from main (#123905)
1 parent eeccb04 commit 1fa422d

File tree

1 file changed

+74
-68
lines changed

1 file changed

+74
-68
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

Lines changed: 74 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -360,63 +360,66 @@ public void testStopQueryLocal() throws Exception {
360360
includeCCSMetadata.v1()
361361
);
362362

363-
// wait until we know that the query against 'remote-b:blocking' has started
364-
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
365-
366-
// wait until the remotes are done
367-
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
368-
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
369-
370-
/* at this point:
371-
* the query against remotes should be finished
372-
* the query against the local cluster should be running because it's blocked
373-
*/
363+
try {
364+
// wait until we know that the query against 'remote-b:blocking' has started
365+
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
366+
367+
// wait until the remotes are done
368+
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
369+
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
370+
371+
/* at this point:
372+
* the query against remotes should be finished
373+
* the query against the local cluster should be running because it's blocked
374+
*/
375+
376+
// run the stop query
377+
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
378+
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
379+
// ensure stop operation is running
380+
assertBusy(() -> {
381+
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
382+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
383+
assertNotNull(executionInfo);
384+
assertThat(executionInfo.isPartial(), is(true));
385+
}
386+
});
387+
// allow local query to proceed
388+
SimplePauseFieldPlugin.allowEmitting.countDown();
389+
390+
// Since part of the query has not been stopped, we expect some result to emerge here
391+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
392+
assertThat(asyncResponse.isRunning(), is(false));
393+
assertThat(asyncResponse.columns().size(), equalTo(1));
394+
assertThat(asyncResponse.values().hasNext(), is(true));
395+
Iterator<Object> row = asyncResponse.values().next();
396+
// sum of 0-9 squared is 285, from two remotes it's 570
397+
assertThat(row.next(), equalTo(570L));
374398

375-
// run the stop query
376-
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
377-
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
378-
// ensure stop operation is running
379-
assertBusy(() -> {
380-
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
381399
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
382400
assertNotNull(executionInfo);
383-
assertThat(executionInfo.isPartial(), is(true));
384-
}
385-
});
386-
// allow local query to proceed
387-
SimplePauseFieldPlugin.allowEmitting.countDown();
388-
389-
// Since part of the query has not been stopped, we expect some result to emerge here
390-
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
391-
assertThat(asyncResponse.isRunning(), is(false));
392-
assertThat(asyncResponse.columns().size(), equalTo(1));
393-
assertThat(asyncResponse.values().hasNext(), is(true));
394-
Iterator<Object> row = asyncResponse.values().next();
395-
// sum of 0-9 squared is 285, from two remotes it's 570
396-
assertThat(row.next(), equalTo(570L));
397-
398-
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
399-
assertNotNull(executionInfo);
400-
assertThat(executionInfo.isCrossClusterSearch(), is(true));
401-
long overallTookMillis = executionInfo.overallTook().millis();
402-
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
403-
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
404-
assertThat(executionInfo.isPartial(), equalTo(true));
401+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
402+
long overallTookMillis = executionInfo.overallTook().millis();
403+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
404+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
405+
assertThat(executionInfo.isPartial(), equalTo(true));
405406

406-
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
407-
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
408-
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
407+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
408+
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
409+
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
409410

410-
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
411-
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
412-
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
411+
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
412+
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
413+
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
413414

414-
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
415-
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
416-
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
415+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
416+
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
417+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
417418

418-
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
419+
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
420+
}
419421
} finally {
422+
SimplePauseFieldPlugin.allowEmitting.countDown();
420423
assertAcked(deleteAsyncId(client(), asyncExecutionId));
421424
}
422425
}
@@ -434,30 +437,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
434437
includeCCSMetadata.v1()
435438
);
436439

437-
// wait until we know that the query against 'remote-b:blocking' has started
438-
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
440+
try {
441+
// wait until we know that the query against 'remote-b:blocking' has started
442+
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
439443

440-
/* at this point:
441-
* the query against the local cluster should be running because it's blocked
442-
*/
444+
/* at this point:
445+
* the query against the local cluster should be running because it's blocked
446+
*/
443447

444-
// run the stop query
445-
var stopRequest = new AsyncStopRequest(asyncExecutionId);
446-
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
447-
// allow local query to proceed
448-
SimplePauseFieldPlugin.allowEmitting.countDown();
448+
// run the stop query
449+
var stopRequest = new AsyncStopRequest(asyncExecutionId);
450+
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
451+
// allow local query to proceed
452+
SimplePauseFieldPlugin.allowEmitting.countDown();
449453

450-
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
451-
assertThat(asyncResponse.isRunning(), is(false));
452-
assertThat(asyncResponse.columns().size(), equalTo(1));
453-
assertThat(asyncResponse.values().hasNext(), is(true));
454-
Iterator<Object> row = asyncResponse.values().next();
455-
assertThat((long) row.next(), greaterThanOrEqualTo(0L));
454+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
455+
assertThat(asyncResponse.isRunning(), is(false));
456+
assertThat(asyncResponse.columns().size(), equalTo(1));
457+
assertThat(asyncResponse.values().hasNext(), is(true));
458+
Iterator<Object> row = asyncResponse.values().next();
459+
assertThat((long) row.next(), greaterThanOrEqualTo(0L));
456460

457-
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
458-
assertNotNull(executionInfo);
459-
assertThat(executionInfo.isCrossClusterSearch(), is(false));
461+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
462+
assertNotNull(executionInfo);
463+
assertThat(executionInfo.isCrossClusterSearch(), is(false));
464+
}
460465
} finally {
466+
SimplePauseFieldPlugin.allowEmitting.countDown();
461467
assertAcked(deleteAsyncId(client(), asyncExecutionId));
462468
}
463469
}

0 commit comments

Comments
 (0)