Skip to content

Commit 9af5872

Browse files
committed
Allow partial results in ES|QL
1 parent f8aa047 commit 9af5872

File tree

24 files changed

+269
-74
lines changed

24 files changed

+269
-74
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ static TransportVersion def(int id) {
189189
public static final TransportVersion ESQL_PROFILE_ASYNC_NANOS = def(9_007_00_0);
190190
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
191191
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
192+
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_010_0_00);
192193

193194
/*
194195
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
7171
query,
7272
profile,
7373
tables,
74-
System.nanoTime()
74+
System.nanoTime(),
75+
false
7576
);
7677
}
7778

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
388388
query,
389389
false,
390390
TABLES,
391-
System.nanoTime()
391+
System.nanoTime(),
392+
false
392393
);
393394
}
394395

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,16 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
9696
@Override
9797
protected EsqlQueryResponse run(EsqlQueryRequest request) {
9898
try {
99-
return runWithBreaking(request);
99+
if (randomBoolean()) {
100+
request.allowPartialResults(randomBoolean());
101+
}
102+
var resp = runWithBreaking(request);
103+
if (resp.isPartial() == false) {
104+
return resp;
105+
}
106+
try (resp) {
107+
assertTrue(request.allowPartialResults());
108+
}
100109
} catch (Exception e) {
101110
try (EsqlQueryResponse resp = super.run(request)) {
102111
assertThat(e, instanceOf(CircuitBreakingException.class));
@@ -105,6 +114,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
105114
return resp;
106115
}
107116
}
117+
return super.run(request);
108118
}
109119

110120
/**

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,31 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
8383
logger.info("--> start disruption scheme [{}]", disruptionScheme);
8484
disruptionScheme.startDisrupting();
8585
logger.info("--> executing esql query with disruption {} ", request.query());
86+
if (randomBoolean()) {
87+
request.allowPartialResults(randomBoolean());
88+
}
8689
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
8790
try {
88-
return future.actionGet(2, TimeUnit.MINUTES);
91+
var resp = future.actionGet(2, TimeUnit.MINUTES);
92+
if (resp.isPartial() == false) {
93+
return resp;
94+
}
95+
try (resp) {
96+
assertTrue(request.allowPartialResults());
97+
}
8998
} catch (Exception ignored) {
9099

91100
} finally {
92101
clearDisruption();
93102
}
94103
try {
95-
return future.actionGet(2, TimeUnit.MINUTES);
104+
var resp = future.actionGet(2, TimeUnit.MINUTES);
105+
if (resp.isPartial() == false) {
106+
return resp;
107+
}
108+
try (resp) {
109+
assertTrue(request.allowPartialResults());
110+
}
96111
} catch (Exception e) {
97112
logger.info(
98113
"running tasks: {}",
@@ -113,8 +128,8 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
113128
ensureBlocksReleased();
114129
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115130
EsqlTestUtils.assertEsqlFailure(e);
116-
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
117131
}
132+
return super.run(request);
118133
}
119134

120135
private ServiceDisruptionScheme addRandomDisruptionScheme() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
import org.elasticsearch.test.FailingFieldPlugin;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xcontent.json.JsonXContent;
19+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1920

20-
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.in;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2630

2731
/**
2832
* Make sure the failures on the data node come back as failures over the wire.
@@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
4852
return settings;
4953
}
5054

51-
/**
52-
* Use a runtime field that fails when loading field values to fail the entire query.
53-
*/
54-
public void testFailureLoadingFields() throws IOException {
55+
public Set<String> populateIndices() throws Exception {
5556
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
5657
mapping.startObject("runtime");
5758
{
@@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
6364
mapping.endObject();
6465
}
6566
mapping.endObject();
66-
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
67-
68-
int docCount = 50;
69-
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
70-
for (int d = 0; d < docCount; d++) {
71-
docs.add(client().prepareIndex("ok").setSource("foo", d));
67+
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
68+
int okCount = between(1, 50);
69+
Set<String> okIds = new HashSet<>();
70+
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
71+
for (int d = 0; d < okCount; d++) {
72+
String id = "ok-" + d;
73+
okIds.add(id);
74+
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
75+
}
76+
int failCount = between(1, 50);
77+
for (int d = 0; d < failCount; d++) {
78+
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
7279
}
73-
docs.add(client().prepareIndex("fail").setSource("foo", 0));
7480
indexRandom(true, docs);
81+
return okIds;
82+
}
7583

84+
/**
85+
* Use a runtime field that fails when loading field values to fail the entire query.
86+
*/
87+
public void testFailureLoadingFields() throws Exception {
88+
populateIndices();
7689
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
7790
assertThat(e.getMessage(), equalTo("Accessing failing field"));
7891
}
92+
93+
public void testPartialResults() throws Exception {
94+
Set<String> okIds = populateIndices();
95+
{
96+
EsqlQueryRequest request = new EsqlQueryRequest();
97+
request.query("FROM fail,ok | LIMIT 100");
98+
request.allowPartialResults(true);
99+
request.pragmas(randomPragmas());
100+
try (EsqlQueryResponse resp = run(request)) {
101+
assertTrue(resp.isPartial());
102+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
103+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
104+
}
105+
}
106+
{
107+
EsqlQueryRequest request = new EsqlQueryRequest();
108+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
109+
request.allowPartialResults(true);
110+
request.pragmas(randomPragmas());
111+
try (EsqlQueryResponse resp = run(request)) {
112+
assertTrue(resp.isPartial());
113+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
114+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
115+
Set<String> actualIds = new HashSet<>();
116+
for (List<Object> row : rows) {
117+
assertThat(row.size(), equalTo(2));
118+
String id = (String) row.getFirst();
119+
assertThat(id, in(okIds));
120+
assertTrue(actualIds.add(id));
121+
}
122+
}
123+
}
124+
}
79125
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,12 @@ public enum Cap {
809809
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
810810
* were refactored.
811811
*/
812-
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
812+
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
813+
814+
/**
815+
* Support partial_results
816+
*/
817+
SUPPORT_PARTIAL_RESULTS;
813818

814819
private final boolean enabled;
815820

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ public TimeValue planningTookTime() {
174174
public void markEndQuery() {
175175
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
176176
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
177+
if (isPartial == false) {
178+
isPartial = clusterInfo.values().stream().anyMatch(c -> c.failedShards != null && c.failedShards > 0);
179+
}
177180
}
178181

179182
// for testing only - use markEndQuery in production code

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
5252
private boolean keepOnCompletion;
5353
private boolean onSnapshotBuild = Build.current().isSnapshot();
5454
private boolean acceptedPragmaRisks = false;
55+
private boolean allowPartialResults = false;
5556

5657
/**
5758
* "Tables" provided in the request for use with things like {@code LOOKUP}.
@@ -231,6 +232,14 @@ public Map<String, Map<String, Column>> tables() {
231232
return tables;
232233
}
233234

235+
public boolean allowPartialResults() {
236+
return allowPartialResults;
237+
}
238+
239+
public void allowPartialResults(boolean allowPartialResults) {
240+
this.allowPartialResults = allowPartialResults;
241+
}
242+
234243
@Override
235244
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
236245
// Pass the query as the description

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ public boolean isAsync() {
181181
return isRunning;
182182
}
183183

184+
public boolean isPartial() {
185+
return executionInfo != null && executionInfo.isPartial();
186+
}
187+
184188
public EsqlExecutionInfo getExecutionInfo() {
185189
return executionInfo;
186190
}

0 commit comments

Comments
 (0)