Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,65 +53,71 @@ public void testStopQuery() throws Exception {
includeCCSMetadata.v1(),
Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1)
);
try {
// wait until we know that the query against 'remote-b:blocking' has started
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on
// it)
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);

/* at this point:
* the query against cluster-a should be finished
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
assertBusy(() -> {
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
List<TaskInfo> reduceTasks = tasks.stream()
.filter(t -> t.description().contains("_LuceneSourceOperator") == false)
.toList();
assertThat(reduceTasks, empty());
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this assertBusy code isn't changed, but I don't think I understand what it is doing. I think this is waiting for the drivers to be cancelled and all the Lucene operations to no longer present? If yes, can you add a comment to that effect for later help in understanding.

Copy link
Contributor Author

@smalyshev smalyshev Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't fully understand it myself, it's Nhat's code, but it looks like it checks that the drivers don't have any other tasks except _LuceneSourceOperator. I am not 100% sure why _LuceneSourceOperator is excluded though. I personally don't feel confident enough with this to comment it, but this is not the purpose of this patch.

// allow remoteB query to proceed
CountingPauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Check that we did not process all the fields on remote-b
// Should not be getting more than one page here, and we set page size to 1
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 is 45, and sum of 0-9 squared is 285
assertThat(row.next(), equalTo(330L));

// wait until we know that the query against 'remote-b:blocking' has started
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);

/* at this point:
* the query against cluster-a should be finished
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
assertBusy(() -> {
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
assertThat(reduceTasks, empty());
});
// allow remoteB query to proceed
CountingPauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Check that we did not process all the fields on remote-b
// Should not be getting more than one page here, and we set page size to 1
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 is 45, and sum of 0-9 squared is 285
assertThat(row.next(), equalTo(330L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(localCluster, localNumShards);

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(localCluster, localNumShards);

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
}
} finally {
// Ensure proper cleanup if the test fails
CountingPauseFieldPlugin.allowEmitting.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only substantive change in this PR? It's hard to tell what is new and not as even the side-by-side view in GH is not helping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes it so if the test fails, the locks are still removed and the async request is cleared. I've got some very confusing rare failures in the logs, and I suspect this is caused by one test's setup leaking into another. So I want to make it cleaner.

assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand All @@ -131,63 +137,66 @@ public void testStopQueryLocal() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand All @@ -205,30 +214,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

/* at this point:
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

/* at this point:
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
}
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand Down