Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,65 +53,71 @@ public void testStopQuery() throws Exception {
includeCCSMetadata.v1(),
Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1)
);
try {
// wait until we know that the query against 'remote-b:blocking' has started
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on
// it)
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);

/* at this point:
* the query against cluster-a should be finished
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
assertBusy(() -> {
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
List<TaskInfo> reduceTasks = tasks.stream()
.filter(t -> t.description().contains("_LuceneSourceOperator") == false)
.toList();
assertThat(reduceTasks, empty());
});
// allow remoteB query to proceed
CountingPauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Check that we did not process all the fields on remote-b
// Should not be getting more than one page here, and we set page size to 1
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 is 45, and sum of 0-9 squared is 285
assertThat(row.next(), equalTo(330L));

// wait until we know that the query against 'remote-b:blocking' has started
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);

/* at this point:
* the query against cluster-a should be finished
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
assertBusy(() -> {
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
assertThat(reduceTasks, empty());
});
// allow remoteB query to proceed
CountingPauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Check that we did not process all the fields on remote-b
// Should not be getting more than one page here, and we set page size to 1
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 is 45, and sum of 0-9 squared is 285
assertThat(row.next(), equalTo(330L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(localCluster, localNumShards);

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(localCluster, localNumShards);

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
}
} finally {
// Ensure proper cleanup if the test fails
CountingPauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand All @@ -131,63 +137,66 @@ public void testStopQueryLocal() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand All @@ -205,30 +214,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

/* at this point:
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

/* at this point:
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
}
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand Down