Skip to content

Commit 983c4d7

Browse files
authored
Improve CrossClusterAsyncQueryStopIT test resiliency (#122219) (#122302)
1 parent 4a9093d commit 983c4d7

File tree

1 file changed

+144
-132
lines changed

1 file changed

+144
-132
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 144 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -53,65 +53,71 @@ public void testStopQuery() throws Exception {
5353
includeCCSMetadata.v1(),
5454
Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1)
5555
);
56+
try {
57+
// wait until we know that the query against 'remote-b:blocking' has started
58+
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
59+
60+
// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on
61+
// it)
62+
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
63+
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
64+
65+
/* at this point:
66+
* the query against cluster-a should be finished
67+
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
68+
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
69+
*/
70+
71+
// run the stop query
72+
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
73+
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
74+
assertBusy(() -> {
75+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
76+
List<TaskInfo> reduceTasks = tasks.stream()
77+
.filter(t -> t.description().contains("_LuceneSourceOperator") == false)
78+
.toList();
79+
assertThat(reduceTasks, empty());
80+
});
81+
// allow remoteB query to proceed
82+
CountingPauseFieldPlugin.allowEmitting.countDown();
83+
84+
// Since part of the query has not been stopped, we expect some result to emerge here
85+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
86+
// Check that we did not process all the fields on remote-b
87+
// Should not be getting more than one page here, and we set page size to 1
88+
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
89+
assertThat(asyncResponse.isRunning(), is(false));
90+
assertThat(asyncResponse.columns().size(), equalTo(1));
91+
assertThat(asyncResponse.values().hasNext(), is(true));
92+
Iterator<Object> row = asyncResponse.values().next();
93+
// sum of 0-9 is 45, and sum of 0-9 squared is 285
94+
assertThat(row.next(), equalTo(330L));
5695

57-
// wait until we know that the query against 'remote-b:blocking' has started
58-
CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
59-
60-
// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
61-
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
62-
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
63-
64-
/* at this point:
65-
* the query against cluster-a should be finished
66-
* the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
67-
* the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
68-
*/
69-
70-
// run the stop query
71-
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
72-
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
73-
assertBusy(() -> {
74-
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
75-
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
76-
assertThat(reduceTasks, empty());
77-
});
78-
// allow remoteB query to proceed
79-
CountingPauseFieldPlugin.allowEmitting.countDown();
80-
81-
// Since part of the query has not been stopped, we expect some result to emerge here
82-
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
83-
// Check that we did not process all the fields on remote-b
84-
// Should not be getting more than one page here, and we set page size to 1
85-
assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L));
86-
assertThat(asyncResponse.isRunning(), is(false));
87-
assertThat(asyncResponse.columns().size(), equalTo(1));
88-
assertThat(asyncResponse.values().hasNext(), is(true));
89-
Iterator<Object> row = asyncResponse.values().next();
90-
// sum of 0-9 is 45, and sum of 0-9 squared is 285
91-
assertThat(row.next(), equalTo(330L));
92-
93-
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
94-
assertNotNull(executionInfo);
95-
assertThat(executionInfo.isCrossClusterSearch(), is(true));
96-
long overallTookMillis = executionInfo.overallTook().millis();
97-
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
98-
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
99-
assertThat(executionInfo.isPartial(), equalTo(true));
100-
101-
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
102-
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
103-
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
104-
105-
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
106-
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
107-
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
108-
109-
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
110-
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
111-
assertClusterInfoSuccess(localCluster, localNumShards);
112-
113-
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
96+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
97+
assertNotNull(executionInfo);
98+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
99+
long overallTookMillis = executionInfo.overallTook().millis();
100+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
101+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
102+
assertThat(executionInfo.isPartial(), equalTo(true));
103+
104+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
105+
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
106+
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
107+
108+
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
109+
assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking"));
110+
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
111+
112+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
113+
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
114+
assertClusterInfoSuccess(localCluster, localNumShards);
115+
116+
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
117+
}
114118
} finally {
119+
// Ensure proper cleanup if the test fails
120+
CountingPauseFieldPlugin.allowEmitting.countDown();
115121
assertAcked(deleteAsyncId(client(), asyncExecutionId));
116122
}
117123
}
@@ -131,63 +137,66 @@ public void testStopQueryLocal() throws Exception {
131137
includeCCSMetadata.v1()
132138
);
133139

134-
// wait until we know that the query against 'remote-b:blocking' has started
135-
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
136-
137-
// wait until the remotes are done
138-
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
139-
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
140+
try {
141+
// wait until we know that the query against 'remote-b:blocking' has started
142+
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
143+
144+
// wait until the remotes are done
145+
waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId);
146+
waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId);
147+
148+
/* at this point:
149+
* the query against remotes should be finished
150+
* the query against the local cluster should be running because it's blocked
151+
*/
152+
153+
// run the stop query
154+
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
155+
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
156+
// ensure stop operation is running
157+
assertBusy(() -> {
158+
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
159+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
160+
assertNotNull(executionInfo);
161+
assertThat(executionInfo.isPartial(), is(true));
162+
}
163+
});
164+
// allow local query to proceed
165+
SimplePauseFieldPlugin.allowEmitting.countDown();
166+
167+
// Since part of the query has not been stopped, we expect some result to emerge here
168+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
169+
assertThat(asyncResponse.isRunning(), is(false));
170+
assertThat(asyncResponse.columns().size(), equalTo(1));
171+
assertThat(asyncResponse.values().hasNext(), is(true));
172+
Iterator<Object> row = asyncResponse.values().next();
173+
// sum of 0-9 squared is 285, from two remotes it's 570
174+
assertThat(row.next(), equalTo(570L));
140175

141-
/* at this point:
142-
* the query against remotes should be finished
143-
* the query against the local cluster should be running because it's blocked
144-
*/
145-
146-
// run the stop query
147-
AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId);
148-
ActionFuture<EsqlQueryResponse> stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
149-
// ensure stop operation is running
150-
assertBusy(() -> {
151-
try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) {
152176
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
153177
assertNotNull(executionInfo);
154-
assertThat(executionInfo.isPartial(), is(true));
178+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
179+
long overallTookMillis = executionInfo.overallTook().millis();
180+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
181+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
182+
assertThat(executionInfo.isPartial(), equalTo(true));
183+
184+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
185+
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
186+
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
187+
188+
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
189+
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
190+
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
191+
192+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
193+
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
194+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
195+
196+
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
155197
}
156-
});
157-
// allow local query to proceed
158-
SimplePauseFieldPlugin.allowEmitting.countDown();
159-
160-
// Since part of the query has not been stopped, we expect some result to emerge here
161-
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
162-
assertThat(asyncResponse.isRunning(), is(false));
163-
assertThat(asyncResponse.columns().size(), equalTo(1));
164-
assertThat(asyncResponse.values().hasNext(), is(true));
165-
Iterator<Object> row = asyncResponse.values().next();
166-
// sum of 0-9 squared is 285, from two remotes it's 570
167-
assertThat(row.next(), equalTo(570L));
168-
169-
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
170-
assertNotNull(executionInfo);
171-
assertThat(executionInfo.isCrossClusterSearch(), is(true));
172-
long overallTookMillis = executionInfo.overallTook().millis();
173-
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
174-
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
175-
assertThat(executionInfo.isPartial(), equalTo(true));
176-
177-
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
178-
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
179-
assertClusterInfoSuccess(remoteCluster, remote1NumShards);
180-
181-
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2);
182-
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*"));
183-
assertClusterInfoSuccess(remote2Cluster, remote2NumShards);
184-
185-
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
186-
assertThat(localCluster.getIndexExpression(), equalTo("blocking"));
187-
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
188-
189-
assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
190198
} finally {
199+
SimplePauseFieldPlugin.allowEmitting.countDown();
191200
assertAcked(deleteAsyncId(client(), asyncExecutionId));
192201
}
193202
}
@@ -205,30 +214,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
205214
includeCCSMetadata.v1()
206215
);
207216

208-
// wait until we know that the query against 'remote-b:blocking' has started
209-
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
210-
211-
/* at this point:
212-
* the query against the local cluster should be running because it's blocked
213-
*/
214-
215-
// run the stop query
216-
var stopRequest = new AsyncStopRequest(asyncExecutionId);
217-
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
218-
// allow local query to proceed
219-
SimplePauseFieldPlugin.allowEmitting.countDown();
220-
221-
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
222-
assertThat(asyncResponse.isRunning(), is(false));
223-
assertThat(asyncResponse.columns().size(), equalTo(1));
224-
assertThat(asyncResponse.values().hasNext(), is(true));
225-
Iterator<Object> row = asyncResponse.values().next();
226-
assertThat((long) row.next(), greaterThanOrEqualTo(0L));
227-
228-
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
229-
assertNotNull(executionInfo);
230-
assertThat(executionInfo.isCrossClusterSearch(), is(false));
217+
try {
218+
// wait until we know that the query against 'remote-b:blocking' has started
219+
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
220+
221+
/* at this point:
222+
* the query against the local cluster should be running because it's blocked
223+
*/
224+
225+
// run the stop query
226+
var stopRequest = new AsyncStopRequest(asyncExecutionId);
227+
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
228+
// allow local query to proceed
229+
SimplePauseFieldPlugin.allowEmitting.countDown();
230+
231+
try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) {
232+
assertThat(asyncResponse.isRunning(), is(false));
233+
assertThat(asyncResponse.columns().size(), equalTo(1));
234+
assertThat(asyncResponse.values().hasNext(), is(true));
235+
Iterator<Object> row = asyncResponse.values().next();
236+
assertThat((long) row.next(), greaterThanOrEqualTo(0L));
237+
238+
EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
239+
assertNotNull(executionInfo);
240+
assertThat(executionInfo.isCrossClusterSearch(), is(false));
241+
}
231242
} finally {
243+
SimplePauseFieldPlugin.allowEmitting.countDown();
232244
assertAcked(deleteAsyncId(client(), asyncExecutionId));
233245
}
234246
}

0 commit comments

Comments
 (0)