Skip to content

Commit 2e40166

Browse files
committed
return partial_results
1 parent 653d080 commit 2e40166

File tree

11 files changed

+114
-41
lines changed

11 files changed

+114
-41
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
389389
false,
390390
TABLES,
391391
System.nanoTime(),
392-
false
392+
false
393393
);
394394
}
395395

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,6 @@ protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas)
150150
protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
151151
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
152152
request.query(esqlCommands);
153-
if (randomBoolean()) {
154-
request.allowPartialResults(randomBoolean());
155-
}
156153
if (pragmas != null) {
157154
request.pragmas(pragmas);
158155
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,17 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
9696
@Override
9797
protected EsqlQueryResponse run(EsqlQueryRequest request) {
9898
try {
99-
return runWithBreaking(request);
99+
if (randomBoolean()) {
100+
request.allowPartialResults(randomBoolean());
101+
}
102+
try (EsqlQueryResponse resp = runWithBreaking(request)) {
103+
if (resp.isPartial()) {
104+
assertTrue(request.allowPartialResults());
105+
} else {
106+
resp.incRef();
107+
return resp;
108+
}
109+
}
100110
} catch (Exception e) {
101111
try (EsqlQueryResponse resp = super.run(request)) {
102112
assertThat(e, instanceOf(CircuitBreakingException.class));
@@ -105,6 +115,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
105115
return resp;
106116
}
107117
}
118+
return super.run(request);
108119
}
109120

110121
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,31 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
8484
logger.info("--> start disruption scheme [{}]", disruptionScheme);
8585
disruptionScheme.startDisrupting();
8686
logger.info("--> executing esql query with disruption {} ", request.query());
87+
if (randomBoolean()) {
88+
request.allowPartialResults(randomBoolean());
89+
}
8790
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
8891
try {
89-
return future.actionGet(2, TimeUnit.MINUTES);
92+
try (var resp = future.actionGet(2, TimeUnit.MINUTES)) {
93+
if (resp.isPartial()) {
94+
assertTrue(request.allowPartialResults());
95+
} else {
96+
resp.incRef();
97+
return resp;
98+
}
99+
}
90100
} catch (Exception ignored) {
91101

92102
} finally {
93103
clearDisruption();
94104
}
95-
try {
96-
return future.actionGet(2, TimeUnit.MINUTES);
105+
try (EsqlQueryResponse resp = future.actionGet(2, TimeUnit.MINUTES)) {
106+
if (resp.isPartial()) {
107+
assertTrue(request.allowPartialResults());
108+
} else {
109+
resp.incRef();
110+
return resp;
111+
}
97112
} catch (Exception e) {
98113
logger.info(
99114
"running tasks: {}",
@@ -114,9 +129,9 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
114129
ensureBlocksReleased();
115130
logger.info("--> failed to execute esql query with disruption; retrying...", e);
116131
EsqlTestUtils.assertEsqlFailure(e);
117-
request.allowPartialResults(false);
118132
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
119133
}
134+
return super.run(request);
120135
}
121136

122137
private ServiceDisruptionScheme addRandomDisruptionScheme() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import org.elasticsearch.test.FailingFieldPlugin;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xcontent.json.JsonXContent;
19+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1920

2021
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
2324
import java.util.List;
2425

2526
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2628

2729
/**
2830
* Make sure the failures on the data node come back as failures over the wire.
@@ -73,7 +75,23 @@ public void testFailureLoadingFields() throws IOException {
7375
docs.add(client().prepareIndex("fail").setSource("foo", 0));
7476
indexRandom(true, docs);
7577

76-
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
77-
assertThat(e.getMessage(), equalTo("Accessing failing field"));
78+
// without partial results
79+
String query = "FROM fail,ok | LIMIT 100";
80+
{
81+
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run(query).close());
82+
assertThat(e.getMessage(), equalTo("Accessing failing field"));
83+
}
84+
// allow partial results
85+
{
86+
EsqlQueryRequest request = new EsqlQueryRequest();
87+
request.query(query);
88+
request.allowPartialResults(true);
89+
request.pragmas(randomPragmas());
90+
try (EsqlQueryResponse resp = run(request)) {
91+
assertTrue(resp.isPartial());
92+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
93+
assertThat(rows.size(), lessThanOrEqualTo(50));
94+
}
95+
}
7896
}
7997
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ public TimeValue planningTookTime() {
174174
public void markEndQuery() {
175175
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
176176
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
177+
if (isPartial == false) {
178+
isPartial = clusterInfo.values().stream().anyMatch(c -> c.failedShards != null && c.failedShards > 0);
179+
}
177180
}
178181

179182
// for testing only - use markEndQuery in production code

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ public boolean isAsync() {
181181
return isRunning;
182182
}
183183

184+
public boolean isPartial() {
185+
return executionInfo != null && executionInfo.isPartial();
186+
}
187+
184188
public EsqlExecutionInfo getExecutionInfo() {
185189
return executionInfo;
186190
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,11 @@ public void execute(
209209
transportService.getThreadPool(),
210210
cancelQueryOnFailure,
211211
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
212-
if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
213-
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
214-
var status = localClusterWasInterrupted.get()
215-
? EsqlExecutionInfo.Cluster.Status.PARTIAL
216-
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
212+
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
213+
var status = localClusterWasInterrupted.get()
214+
? EsqlExecutionInfo.Cluster.Status.PARTIAL
215+
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
216+
if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
217217
execInfo.swapCluster(
218218
LOCAL_CLUSTER,
219219
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status).setTook(tookTime).build()
@@ -252,16 +252,14 @@ public void execute(
252252
cancelQueryOnFailure,
253253
localListener.acquireCompute().map(r -> {
254254
localClusterWasInterrupted.set(execInfo.isPartial());
255-
if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
256-
execInfo.swapCluster(
257-
LOCAL_CLUSTER,
258-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
259-
.setSuccessfulShards(r.getSuccessfulShards())
260-
.setSkippedShards(r.getSkippedShards())
261-
.setFailedShards(r.getFailedShards())
262-
.build()
263-
);
264-
}
255+
execInfo.swapCluster(
256+
LOCAL_CLUSTER,
257+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
258+
.setSuccessfulShards(r.getSuccessfulShards())
259+
.setSkippedShards(r.getSkippedShards())
260+
.setFailedShards(r.getFailedShards())
261+
.build()
262+
);
265263
return r.getProfiles();
266264
})
267265
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,32 @@ protected void sendRequest(
127127
esqlExecutor,
128128
listener.delegateFailureAndWrap((l, unused) -> {
129129
final AtomicReference<DataNodeComputeResponse> nodeResponseRef = new AtomicReference<>();
130+
final Runnable onGroupFailure;
131+
if (allowPartialResults) {
132+
CancellableTask groupTask = RemoteListenerGroup.createGroupTask(
133+
transportService,
134+
parentTask,
135+
() -> "compute group, data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
136+
);
137+
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
138+
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
139+
} else {
140+
onGroupFailure = () -> {
141+
if (nodeResponseRef.get() != null) {
142+
exchangeService.finishSinkHandler(
143+
childSessionId,
144+
new TaskCancelledException(parentTask.getReasonCancelled())
145+
);
146+
}
147+
};
148+
}
130149
try (
131-
var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get()))
150+
var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
132151
) {
133152
final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection);
134153
exchangeSource.addRemoteSink(
135154
remoteSink,
136-
true,
155+
allowPartialResults == false,
137156
pagesFetched::incrementAndGet,
138157
queryPragmas.concurrentExchangeClients(),
139158
computeListener.acquireAvoid()
@@ -239,6 +258,7 @@ public void onFailure(Exception e) {
239258
}
240259
onResponse(List.of());
241260
} else {
261+
// TODO: fatal vs non-fatal
242262
try {
243263
exchangeService.finishSinkHandler(request.sessionId(), e);
244264
} finally {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,14 @@ final void startComputeOnDataNodes(
8282
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
8383
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
8484
TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
85+
final int failedShards = shardFailures.size();
8586
return new ComputeResponse(
8687
profiles,
8788
took,
8889
targetShards.totalShards(),
89-
targetShards.totalShards(),
90+
targetShards.totalShards() - failedShards,
9091
targetShards.skippedShards(),
91-
0
92+
failedShards
9293
);
9394
}))) {
9495
for (TargetShard shard : targetShards.shards.values()) {
@@ -111,21 +112,26 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
111112
if (changed.compareAndSet(true, false) == false) {
112113
break;
113114
}
115+
for (ShardId shardId : pendingShardIds) {
116+
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) shardFailures.compute(
117+
shardId,
118+
(k, v) -> new ShardFailure(
119+
true,
120+
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
121+
)
122+
);
123+
}
124+
// remove shards with fatal failure from pending shards
114125
final Iterator<ShardId> shardIts = pendingShardIds.iterator();
115126
while (shardIts.hasNext()) {
116127
final ShardId shardId = shardIts.next();
117-
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
118-
shardFailures.compute(
119-
shardId,
120-
(k, v) -> new ShardFailure(
121-
true,
122-
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
123-
)
124-
);
128+
final ShardFailure failure = shardFailures.get(shardId);
129+
if (failure != null && failure.fatal) {
125130
shardIts.remove();
126131
}
127132
}
128-
if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
133+
if (reportedFailure
134+
|| (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
129135
reportedFailure = true;
130136
reportFailures(computeListener);
131137
} else {

0 commit comments

Comments
 (0)