Skip to content

Commit b07ba89

Browse files
authored
Support partial results in ES|QL (#121942)
This change introduces partial results in ES|QL. To minimize the scope of the changes, this PR is just the first step toward full support for partial results. The following follow-up tasks are required: - Support partial results across clusters - Return shard-level failures (currently, we only return the `is_partial` flag) - Add documentation - Allow partial results during resolution
1 parent d264f7a commit b07ba89

File tree

25 files changed

+305
-116
lines changed

25 files changed

+305
-116
lines changed

docs/changelog/121942.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121942
2+
summary: Allow partial results in ES|QL
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ static TransportVersion def(int id) {
192192
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
193193
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
194194
public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
195+
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);
195196

196197
/*
197198
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
7171
query,
7272
profile,
7373
tables,
74-
System.nanoTime()
74+
System.nanoTime(),
75+
false
7576
);
7677
}
7778

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
391391
query,
392392
false,
393393
TABLES,
394-
System.nanoTime()
394+
System.nanoTime(),
395+
false
395396
);
396397
}
397398

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,30 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
9595

9696
@Override
9797
protected EsqlQueryResponse run(EsqlQueryRequest request) {
98+
if (randomBoolean()) {
99+
request.allowPartialResults(randomBoolean());
100+
}
101+
Exception failure = null;
98102
try {
99-
return runWithBreaking(request);
100-
} catch (Exception e) {
101-
try (EsqlQueryResponse resp = super.run(request)) {
102-
assertThat(e, instanceOf(CircuitBreakingException.class));
103-
assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
104-
resp.incRef();
103+
final EsqlQueryResponse resp = runWithBreaking(request);
104+
if (resp.isPartial() == false) {
105105
return resp;
106106
}
107+
try (resp) {
108+
assertTrue(request.allowPartialResults());
109+
}
110+
} catch (Exception e) {
111+
failure = e;
112+
}
113+
// Re-run if the previous query failed or returned partial results
114+
// Only check the previous failure if the second query succeeded
115+
try (EsqlQueryResponse resp = super.run(request)) {
116+
if (failure != null) {
117+
assertThat(failure, instanceOf(CircuitBreakingException.class));
118+
assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
119+
}
120+
resp.incRef();
121+
return resp;
107122
}
108123
}
109124

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,38 +83,58 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
8383
logger.info("--> start disruption scheme [{}]", disruptionScheme);
8484
disruptionScheme.startDisrupting();
8585
logger.info("--> executing esql query with disruption {} ", request.query());
86+
if (randomBoolean()) {
87+
request.allowPartialResults(randomBoolean());
88+
}
8689
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
90+
EsqlQueryResponse resp = null;
8791
try {
88-
return future.actionGet(2, TimeUnit.MINUTES);
92+
resp = future.actionGet(2, TimeUnit.MINUTES);
93+
if (resp.isPartial() == false) {
94+
return resp;
95+
}
8996
} catch (Exception ignored) {
9097

9198
} finally {
9299
clearDisruption();
93100
}
94-
try {
95-
return future.actionGet(2, TimeUnit.MINUTES);
96-
} catch (Exception e) {
97-
logger.info(
98-
"running tasks: {}",
99-
client().admin()
100-
.cluster()
101-
.prepareListTasks()
102-
.get()
103-
.getTasks()
104-
.stream()
105-
.filter(
106-
// Skip the tasks we that'd get in the way while debugging
107-
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
108-
&& false == t.action().contains(HealthNode.TASK_NAME)
109-
)
110-
.toList()
111-
);
112-
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
113-
ensureBlocksReleased();
114-
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115-
EsqlTestUtils.assertEsqlFailure(e);
116-
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
101+
// wait for the response after clear disruption
102+
if (resp == null) {
103+
try {
104+
resp = future.actionGet(2, TimeUnit.MINUTES);
105+
} catch (Exception e) {
106+
logger.info(
107+
"running tasks: {}",
108+
client().admin()
109+
.cluster()
110+
.prepareListTasks()
111+
.get()
112+
.getTasks()
113+
.stream()
114+
.filter(
115+
// Skip the tasks we that'd get in the way while debugging
116+
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
117+
&& false == t.action().contains(HealthNode.TASK_NAME)
118+
)
119+
.toList()
120+
);
121+
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
122+
ensureBlocksReleased();
123+
logger.info("--> failed to execute esql query with disruption; retrying...", e);
124+
EsqlTestUtils.assertEsqlFailure(e);
125+
}
126+
}
127+
// use the response if it's not partial
128+
if (resp != null) {
129+
if (resp.isPartial() == false) {
130+
return resp;
131+
}
132+
try (var ignored = resp) {
133+
assertTrue(request.allowPartialResults());
134+
}
117135
}
136+
// re-run the query
137+
return super.run(request);
118138
}
119139

120140
private ServiceDisruptionScheme addRandomDisruptionScheme() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
import org.elasticsearch.test.FailingFieldPlugin;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xcontent.json.JsonXContent;
19+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1920

20-
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.in;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2630

2731
/**
2832
* Make sure the failures on the data node come back as failures over the wire.
@@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
4852
return settings;
4953
}
5054

51-
/**
52-
* Use a runtime field that fails when loading field values to fail the entire query.
53-
*/
54-
public void testFailureLoadingFields() throws IOException {
55+
public Set<String> populateIndices() throws Exception {
5556
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
5657
mapping.startObject("runtime");
5758
{
@@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
6364
mapping.endObject();
6465
}
6566
mapping.endObject();
66-
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
67-
68-
int docCount = 50;
69-
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
70-
for (int d = 0; d < docCount; d++) {
71-
docs.add(client().prepareIndex("ok").setSource("foo", d));
67+
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
68+
int okCount = between(1, 50);
69+
Set<String> okIds = new HashSet<>();
70+
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
71+
for (int d = 0; d < okCount; d++) {
72+
String id = "ok-" + d;
73+
okIds.add(id);
74+
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
75+
}
76+
int failCount = between(1, 50);
77+
for (int d = 0; d < failCount; d++) {
78+
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
7279
}
73-
docs.add(client().prepareIndex("fail").setSource("foo", 0));
7480
indexRandom(true, docs);
81+
return okIds;
82+
}
7583

84+
/**
85+
* Use a runtime field that fails when loading field values to fail the entire query.
86+
*/
87+
public void testFailureLoadingFields() throws Exception {
88+
populateIndices();
7689
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
7790
assertThat(e.getMessage(), equalTo("Accessing failing field"));
7891
}
92+
93+
public void testPartialResults() throws Exception {
94+
Set<String> okIds = populateIndices();
95+
{
96+
EsqlQueryRequest request = new EsqlQueryRequest();
97+
request.query("FROM fail,ok | LIMIT 100");
98+
request.allowPartialResults(true);
99+
request.pragmas(randomPragmas());
100+
try (EsqlQueryResponse resp = run(request)) {
101+
assertTrue(resp.isPartial());
102+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
103+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
104+
}
105+
}
106+
{
107+
EsqlQueryRequest request = new EsqlQueryRequest();
108+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
109+
request.allowPartialResults(true);
110+
request.pragmas(randomPragmas());
111+
try (EsqlQueryResponse resp = run(request)) {
112+
assertTrue(resp.isPartial());
113+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
114+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
115+
Set<String> actualIds = new HashSet<>();
116+
for (List<Object> row : rows) {
117+
assertThat(row.size(), equalTo(2));
118+
String id = (String) row.getFirst();
119+
assertThat(id, in(okIds));
120+
assertTrue(actualIds.add(id));
121+
}
122+
}
123+
}
124+
}
79125
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,12 @@ public enum Cap {
819819
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
820820
* were refactored.
821821
*/
822-
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
822+
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
823+
824+
/**
825+
* Support partial_results
826+
*/
827+
SUPPORT_PARTIAL_RESULTS;
823828

824829
private final boolean enabled;
825830

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,13 @@ public Map<String, Cluster> getClusters() {
246246
* @return the new Cluster object
247247
*/
248248
public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Cluster> remappingFunction) {
249-
return clusterInfo.compute(clusterAlias, remappingFunction);
249+
return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> {
250+
final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster);
251+
if (newCluster != null && isPartial == false) {
252+
isPartial = newCluster.isPartial();
253+
}
254+
return newCluster;
255+
});
250256
}
251257

252258
@Override
@@ -305,13 +311,6 @@ public boolean isPartial() {
305311
return isPartial;
306312
}
307313

308-
/**
309-
* Mark the query as having partial results.
310-
*/
311-
public void markAsPartial() {
312-
isPartial = true;
313-
}
314-
315314
public void markAsStopped() {
316315
isStopped = true;
317316
}
@@ -320,13 +319,6 @@ public boolean isStopped() {
320319
return isStopped;
321320
}
322321

323-
/**
324-
* Mark this cluster as having partial results.
325-
*/
326-
public void markClusterAsPartial(String clusterAlias) {
327-
swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build());
328-
}
329-
330322
/**
331323
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
332324
* The Cluster object can represent either the local cluster or a remote cluster.
@@ -618,6 +610,10 @@ public List<ShardSearchFailure> getFailures() {
618610
return failures;
619611
}
620612

613+
boolean isPartial() {
614+
return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0);
615+
}
616+
621617
@Override
622618
public boolean equals(Object o) {
623619
if (this == o) return true;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
5252
private boolean keepOnCompletion;
5353
private boolean onSnapshotBuild = Build.current().isSnapshot();
5454
private boolean acceptedPragmaRisks = false;
55+
private boolean allowPartialResults = false;
5556

5657
/**
5758
* "Tables" provided in the request for use with things like {@code LOOKUP}.
@@ -231,6 +232,14 @@ public Map<String, Map<String, Column>> tables() {
231232
return tables;
232233
}
233234

235+
public boolean allowPartialResults() {
236+
return allowPartialResults;
237+
}
238+
239+
public void allowPartialResults(boolean allowPartialResults) {
240+
this.allowPartialResults = allowPartialResults;
241+
}
242+
234243
@Override
235244
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
236245
// Pass the query as the description

0 commit comments

Comments
 (0)