Skip to content

Commit 868da8e

Browse files
Merge remote-tracking branch 'elastic/main' into short-ciruit-local-node
2 parents 03374cb + b07ba89 commit 868da8e

File tree

27 files changed

+334
-118
lines changed

27 files changed

+334
-118
lines changed

docs/changelog/121942.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121942
2+
summary: Allow partial results in ES|QL
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ tests:
6969
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
7070
method: test {p0=transform/transforms_start_stop/Test start already started transform}
7171
issue: https://github.com/elastic/elasticsearch/issues/98802
72-
- class: org.elasticsearch.action.search.SearchPhaseControllerTests
73-
method: testProgressListener
74-
issue: https://github.com/elastic/elasticsearch/issues/116149
7572
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
7673
method: testDeprecatedSettingsReturnWarnings
7774
issue: https://github.com/elastic/elasticsearch/issues/108628
@@ -377,6 +374,27 @@ tests:
377374
- class: org.elasticsearch.xpack.inference.mapper.SemanticInferenceMetadataFieldsRecoveryTests
378375
method: testSnapshotRecovery {p0=false p1=true}
379376
issue: https://github.com/elastic/elasticsearch/issues/122551
377+
- class: org.elasticsearch.index.mapper.ShapeGeometryFieldMapperTests
378+
method: testCartesianBoundsBlockLoader
379+
issue: https://github.com/elastic/elasticsearch/issues/122661
380+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
381+
method: testPolicyBuilderOnExternalPlugin
382+
issue: https://github.com/elastic/elasticsearch/issues/122663
383+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
384+
method: testParseFiles
385+
issue: https://github.com/elastic/elasticsearch/issues/122664
386+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
387+
method: testPolicyBuilder
388+
issue: https://github.com/elastic/elasticsearch/issues/122665
389+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserFailureTests
390+
method: testEntitlementAbsolutePathWhenRelative
391+
issue: https://github.com/elastic/elasticsearch/issues/122666
392+
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedNonModularIT
393+
issue: https://github.com/elastic/elasticsearch/issues/122568
394+
- class: org.elasticsearch.entitlement.qa.EntitlementsDeniedIT
395+
issue: https://github.com/elastic/elasticsearch/issues/122566
396+
- class: org.elasticsearch.entitlement.qa.EntitlementsDeniedNonModularIT
397+
issue: https://github.com/elastic/elasticsearch/issues/122569
380398

381399
# Examples:
382400
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ static TransportVersion def(int id) {
192192
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
193193
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
194194
public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
195+
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);
195196

196197
/*
197198
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
7171
query,
7272
profile,
7373
tables,
74-
System.nanoTime()
74+
System.nanoTime(),
75+
false
7576
);
7677
}
7778

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
391391
query,
392392
false,
393393
TABLES,
394-
System.nanoTime()
394+
System.nanoTime(),
395+
false
395396
);
396397
}
397398

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,30 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
9595

9696
@Override
9797
protected EsqlQueryResponse run(EsqlQueryRequest request) {
98+
if (randomBoolean()) {
99+
request.allowPartialResults(randomBoolean());
100+
}
101+
Exception failure = null;
98102
try {
99-
return runWithBreaking(request);
100-
} catch (Exception e) {
101-
try (EsqlQueryResponse resp = super.run(request)) {
102-
assertThat(e, instanceOf(CircuitBreakingException.class));
103-
assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
104-
resp.incRef();
103+
final EsqlQueryResponse resp = runWithBreaking(request);
104+
if (resp.isPartial() == false) {
105105
return resp;
106106
}
107+
try (resp) {
108+
assertTrue(request.allowPartialResults());
109+
}
110+
} catch (Exception e) {
111+
failure = e;
112+
}
113+
// Re-run if the previous query failed or returned partial results
114+
// Only check the previous failure if the second query succeeded
115+
try (EsqlQueryResponse resp = super.run(request)) {
116+
if (failure != null) {
117+
assertThat(failure, instanceOf(CircuitBreakingException.class));
118+
assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
119+
}
120+
resp.incRef();
121+
return resp;
107122
}
108123
}
109124

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,38 +83,58 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
8383
logger.info("--> start disruption scheme [{}]", disruptionScheme);
8484
disruptionScheme.startDisrupting();
8585
logger.info("--> executing esql query with disruption {} ", request.query());
86+
if (randomBoolean()) {
87+
request.allowPartialResults(randomBoolean());
88+
}
8689
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
90+
EsqlQueryResponse resp = null;
8791
try {
88-
return future.actionGet(2, TimeUnit.MINUTES);
92+
resp = future.actionGet(2, TimeUnit.MINUTES);
93+
if (resp.isPartial() == false) {
94+
return resp;
95+
}
8996
} catch (Exception ignored) {
9097

9198
} finally {
9299
clearDisruption();
93100
}
94-
try {
95-
return future.actionGet(2, TimeUnit.MINUTES);
96-
} catch (Exception e) {
97-
logger.info(
98-
"running tasks: {}",
99-
client().admin()
100-
.cluster()
101-
.prepareListTasks()
102-
.get()
103-
.getTasks()
104-
.stream()
105-
.filter(
106-
// Skip the tasks we that'd get in the way while debugging
107-
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
108-
&& false == t.action().contains(HealthNode.TASK_NAME)
109-
)
110-
.toList()
111-
);
112-
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
113-
ensureBlocksReleased();
114-
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115-
EsqlTestUtils.assertEsqlFailure(e);
116-
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
101+
// wait for the response after clear disruption
102+
if (resp == null) {
103+
try {
104+
resp = future.actionGet(2, TimeUnit.MINUTES);
105+
} catch (Exception e) {
106+
logger.info(
107+
"running tasks: {}",
108+
client().admin()
109+
.cluster()
110+
.prepareListTasks()
111+
.get()
112+
.getTasks()
113+
.stream()
114+
.filter(
115+
// Skip the tasks we that'd get in the way while debugging
116+
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
117+
&& false == t.action().contains(HealthNode.TASK_NAME)
118+
)
119+
.toList()
120+
);
121+
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
122+
ensureBlocksReleased();
123+
logger.info("--> failed to execute esql query with disruption; retrying...", e);
124+
EsqlTestUtils.assertEsqlFailure(e);
125+
}
126+
}
127+
// use the response if it's not partial
128+
if (resp != null) {
129+
if (resp.isPartial() == false) {
130+
return resp;
131+
}
132+
try (var ignored = resp) {
133+
assertTrue(request.allowPartialResults());
134+
}
117135
}
136+
// re-run the query
137+
return super.run(request);
118138
}
119139

120140
private ServiceDisruptionScheme addRandomDisruptionScheme() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
import org.elasticsearch.test.FailingFieldPlugin;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xcontent.json.JsonXContent;
19+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1920

20-
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.in;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2630

2731
/**
2832
* Make sure the failures on the data node come back as failures over the wire.
@@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
4852
return settings;
4953
}
5054

51-
/**
52-
* Use a runtime field that fails when loading field values to fail the entire query.
53-
*/
54-
public void testFailureLoadingFields() throws IOException {
55+
public Set<String> populateIndices() throws Exception {
5556
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
5657
mapping.startObject("runtime");
5758
{
@@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
6364
mapping.endObject();
6465
}
6566
mapping.endObject();
66-
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
67-
68-
int docCount = 50;
69-
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
70-
for (int d = 0; d < docCount; d++) {
71-
docs.add(client().prepareIndex("ok").setSource("foo", d));
67+
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
68+
int okCount = between(1, 50);
69+
Set<String> okIds = new HashSet<>();
70+
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
71+
for (int d = 0; d < okCount; d++) {
72+
String id = "ok-" + d;
73+
okIds.add(id);
74+
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
75+
}
76+
int failCount = between(1, 50);
77+
for (int d = 0; d < failCount; d++) {
78+
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
7279
}
73-
docs.add(client().prepareIndex("fail").setSource("foo", 0));
7480
indexRandom(true, docs);
81+
return okIds;
82+
}
7583

84+
/**
85+
* Use a runtime field that fails when loading field values to fail the entire query.
86+
*/
87+
public void testFailureLoadingFields() throws Exception {
88+
populateIndices();
7689
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
7790
assertThat(e.getMessage(), equalTo("Accessing failing field"));
7891
}
92+
93+
public void testPartialResults() throws Exception {
94+
Set<String> okIds = populateIndices();
95+
{
96+
EsqlQueryRequest request = new EsqlQueryRequest();
97+
request.query("FROM fail,ok | LIMIT 100");
98+
request.allowPartialResults(true);
99+
request.pragmas(randomPragmas());
100+
try (EsqlQueryResponse resp = run(request)) {
101+
assertTrue(resp.isPartial());
102+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
103+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
104+
}
105+
}
106+
{
107+
EsqlQueryRequest request = new EsqlQueryRequest();
108+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
109+
request.allowPartialResults(true);
110+
request.pragmas(randomPragmas());
111+
try (EsqlQueryResponse resp = run(request)) {
112+
assertTrue(resp.isPartial());
113+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
114+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
115+
Set<String> actualIds = new HashSet<>();
116+
for (List<Object> row : rows) {
117+
assertThat(row.size(), equalTo(2));
118+
String id = (String) row.getFirst();
119+
assertThat(id, in(okIds));
120+
assertTrue(actualIds.add(id));
121+
}
122+
}
123+
}
124+
}
79125
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,12 @@ public enum Cap {
819819
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
820820
* were refactored.
821821
*/
822-
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
822+
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
823+
824+
/**
825+
* Support partial_results
826+
*/
827+
SUPPORT_PARTIAL_RESULTS;
823828

824829
private final boolean enabled;
825830

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
7676
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
7777
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
7878
private volatile boolean isPartial; // Does this request have partial results?
79+
private transient volatile boolean isStopped; // Have we received stop command?
7980

8081
public EsqlExecutionInfo(boolean includeCCSMetadata) {
8182
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
@@ -245,7 +246,13 @@ public Map<String, Cluster> getClusters() {
245246
* @return the new Cluster object
246247
*/
247248
public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Cluster> remappingFunction) {
248-
return clusterInfo.compute(clusterAlias, remappingFunction);
249+
return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> {
250+
final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster);
251+
if (newCluster != null && isPartial == false) {
252+
isPartial = newCluster.isPartial();
253+
}
254+
return newCluster;
255+
});
249256
}
250257

251258
@Override
@@ -304,18 +311,12 @@ public boolean isPartial() {
304311
return isPartial;
305312
}
306313

307-
/**
308-
* Mark the query as having partial results.
309-
*/
310-
public void markAsPartial() {
311-
isPartial = true;
314+
public void markAsStopped() {
315+
isStopped = true;
312316
}
313317

314-
/**
315-
* Mark this cluster as having partial results.
316-
*/
317-
public void markClusterAsPartial(String clusterAlias) {
318-
swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build());
318+
public boolean isStopped() {
319+
return isStopped;
319320
}
320321

321322
/**
@@ -609,6 +610,10 @@ public List<ShardSearchFailure> getFailures() {
609610
return failures;
610611
}
611612

613+
boolean isPartial() {
614+
return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0);
615+
}
616+
612617
@Override
613618
public boolean equals(Object o) {
614619
if (this == o) return true;

0 commit comments

Comments
 (0)