Skip to content

Commit d3be47f

Browse files
committed
local cluster
1 parent 9283823 commit d3be47f

File tree

2 files changed

+80
-12
lines changed

2 files changed

+80
-12
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.breaker.CircuitBreaker;
1515
import org.elasticsearch.common.breaker.CircuitBreakingException;
1616
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1718
import org.elasticsearch.common.util.set.Sets;
1819
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1920
import org.elasticsearch.test.FailingFieldPlugin;
@@ -268,12 +269,62 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
268269
}
269270
}
270271

272+
public void testFailSearchShardsOnLocalCluster() throws Exception {
273+
populateIndices();
274+
Exception simulatedFailure = randomFailure();
275+
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
276+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
277+
ts.addRequestHandlingBehavior(
278+
EsqlSearchShardsAction.NAME,
279+
(handler, request, channel, task) -> { channel.sendResponse(simulatedFailure); }
280+
);
281+
}
282+
try {
283+
EsqlQueryRequest request = new EsqlQueryRequest();
284+
request.query("FROM ok*,*a:ok* | KEEP id");
285+
request.includeCCSMetadata(randomBoolean());
286+
{
287+
request.allowPartialResults(false);
288+
var error = expectThrows(Exception.class, () -> runQuery(request).close());
289+
EsqlTestUtils.assertEsqlFailure(error);
290+
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
291+
assertNotNull(unwrapped);
292+
assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
293+
}
294+
request.allowPartialResults(true);
295+
try (var resp = runQuery(request)) {
296+
assertTrue(resp.isPartial());
297+
List<List<Object>> rows = getValuesList(resp);
298+
Set<String> returnedIds = new HashSet<>();
299+
for (List<Object> row : rows) {
300+
assertThat(row.size(), equalTo(1));
301+
String id = (String) row.get(0);
302+
assertTrue(returnedIds.add(id));
303+
}
304+
assertThat(returnedIds, equalTo(remote1.okIds));
305+
if (request.includeCCSMetadata()) {
306+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
307+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
308+
309+
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
310+
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
311+
}
312+
}
313+
} finally {
314+
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
315+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
316+
ts.clearAllRules();
317+
}
318+
}
319+
}
320+
271321
private static Exception randomFailure() {
272322
return randomFrom(
273323
new IllegalStateException("driver was closed already"),
274324
new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT),
275325
new IOException("broken disk"),
276-
new ResourceNotFoundException("exchange sink was not found")
326+
new ResourceNotFoundException("exchange sink was not found"),
327+
new EsRejectedExecutionException("node is shutting down")
277328
);
278329
}
279330

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
1212
import org.elasticsearch.action.search.SearchRequest;
13+
import org.elasticsearch.action.search.ShardSearchFailure;
1314
import org.elasticsearch.cluster.service.ClusterService;
1415
import org.elasticsearch.common.util.BigArrays;
1516
import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -213,15 +214,18 @@ public void execute(
213214
cancelQueryOnFailure,
214215
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
215216
if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
216-
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
217-
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
218-
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
219-
? EsqlExecutionInfo.Cluster.Status.PARTIAL
220-
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
221-
execInfo.swapCluster(
222-
LOCAL_CLUSTER,
223-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status).setTook(tookTime).build()
224-
);
217+
execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> {
218+
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
219+
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime);
220+
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
221+
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
222+
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
223+
? EsqlExecutionInfo.Cluster.Status.PARTIAL
224+
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
225+
builder.setStatus(status);
226+
}
227+
return builder.build();
228+
});
225229
}
226230
l.onResponse(profiles);
227231
})
@@ -244,6 +248,7 @@ public void execute(
244248
);
245249
// starts computes on data nodes on the main cluster
246250
if (localConcreteIndices != null && localConcreteIndices.indices().length > 0) {
251+
final var dataNodesListener = localListener.acquireCompute();
247252
dataNodeComputeHandler.startComputeOnDataNodes(
248253
sessionId,
249254
LOCAL_CLUSTER,
@@ -254,7 +259,7 @@ public void execute(
254259
localOriginalIndices,
255260
exchangeSource,
256261
cancelQueryOnFailure,
257-
localListener.acquireCompute().map(r -> {
262+
ActionListener.wrap(r -> {
258263
localClusterWasInterrupted.set(execInfo.isStopped());
259264
execInfo.swapCluster(
260265
LOCAL_CLUSTER,
@@ -264,7 +269,19 @@ public void execute(
264269
.setFailedShards(r.getFailedShards())
265270
.build()
266271
);
267-
return r.getProfiles();
272+
dataNodesListener.onResponse(r.getProfiles());
273+
}, e -> {
274+
if (configuration.allowPartialResults()) {
275+
execInfo.swapCluster(
276+
LOCAL_CLUSTER,
277+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(
278+
EsqlExecutionInfo.Cluster.Status.PARTIAL
279+
).setFailures(List.of(new ShardSearchFailure(e))).build()
280+
);
281+
dataNodesListener.onResponse(List.of());
282+
} else {
283+
dataNodesListener.onFailure(e);
284+
}
268285
})
269286
);
270287
}

0 commit comments

Comments
 (0)