Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,63 +360,66 @@ public void testStopQueryLocal() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the remotes are done
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);

/* at this point:
* the query against remotes should be finished
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

// run the stop query
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// ensure stop operation is running
assertBusy(() -> {
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), is(true));
}
});
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

// Since part of the query has not been stopped, we expect some result to emerge here
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
// sum of 0-9 squared is 285, from two remotes it's 570
assertThat(row.next(), equalTo(570L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
assertThat(executionInfo.isPartial(), equalTo(true));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remoteCluster, remote1NumShards);

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
}
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand All @@ -434,30 +437,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
includeCCSMetadata.v1()
);

// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
try {
// wait until we know that the query against 'remote-b:blocking' has started
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

/* at this point:
* the query against the local cluster should be running because it's blocked
*/
/* at this point:
* the query against the local cluster should be running because it's blocked
*/

// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();
// run the stop query
var stopRequest = new AsyncStopRequest(asyncExecutionId);
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
// allow local query to proceed
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
assertThat(asyncResponse.isRunning(), is(false));
assertThat(asyncResponse.columns().size(), equalTo(1));
assertThat(asyncResponse.values().hasNext(), is(true));
Iterator<Object> row = asyncResponse.values().next();
assertThat((long) row.next(), greaterThanOrEqualTo(0L));

EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
}
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
assertAcked(deleteAsyncId(client(), asyncExecutionId));
}
}
Expand Down