Skip to content

Commit 2a21640

Browse files
Merge branch 'main' into es-integ-test-case-after-test
2 parents 7d4f074 + b07ba89 commit 2a21640

File tree

30 files changed

+371
-139
lines changed

30 files changed

+371
-139
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ distribution/docker/src @elastic/es-delivery
4949
# Core/Infra
5050
distribution/tools @elastic/es-core-infra
5151
libs/core @elastic/es-core-infra
52+
libs/entitlement @elastic/es-core-infra
5253
libs/logging @elastic/es-core-infra
5354
libs/native @elastic/es-core-infra
5455
libs/plugin-analysis-api @elastic/es-core-infra

docs/changelog/121942.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121942
2+
summary: Allow partial results in ES|QL
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ tests:
6969
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
7070
method: test {p0=transform/transforms_start_stop/Test start already started transform}
7171
issue: https://github.com/elastic/elasticsearch/issues/98802
72-
- class: org.elasticsearch.action.search.SearchPhaseControllerTests
73-
method: testProgressListener
74-
issue: https://github.com/elastic/elasticsearch/issues/116149
7572
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
7673
method: testDeprecatedSettingsReturnWarnings
7774
issue: https://github.com/elastic/elasticsearch/issues/108628
@@ -377,6 +374,27 @@ tests:
377374
- class: org.elasticsearch.xpack.inference.mapper.SemanticInferenceMetadataFieldsRecoveryTests
378375
method: testSnapshotRecovery {p0=false p1=true}
379376
issue: https://github.com/elastic/elasticsearch/issues/122551
377+
- class: org.elasticsearch.index.mapper.ShapeGeometryFieldMapperTests
378+
method: testCartesianBoundsBlockLoader
379+
issue: https://github.com/elastic/elasticsearch/issues/122661
380+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
381+
method: testPolicyBuilderOnExternalPlugin
382+
issue: https://github.com/elastic/elasticsearch/issues/122663
383+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
384+
method: testParseFiles
385+
issue: https://github.com/elastic/elasticsearch/issues/122664
386+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserTests
387+
method: testPolicyBuilder
388+
issue: https://github.com/elastic/elasticsearch/issues/122665
389+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyParserFailureTests
390+
method: testEntitlementAbsolutePathWhenRelative
391+
issue: https://github.com/elastic/elasticsearch/issues/122666
392+
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedNonModularIT
393+
issue: https://github.com/elastic/elasticsearch/issues/122568
394+
- class: org.elasticsearch.entitlement.qa.EntitlementsDeniedIT
395+
issue: https://github.com/elastic/elasticsearch/issues/122566
396+
- class: org.elasticsearch.entitlement.qa.EntitlementsDeniedNonModularIT
397+
issue: https://github.com/elastic/elasticsearch/issues/122569
380398

381399
# Examples:
382400
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ static TransportVersion def(int id) {
192192
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
193193
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
194194
public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
195+
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);
195196

196197
/*
197198
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.common.util.concurrent.ThreadContext;
6868
import org.elasticsearch.core.Assertions;
6969
import org.elasticsearch.core.Booleans;
70+
import org.elasticsearch.core.CheckedFunction;
7071
import org.elasticsearch.core.IOUtils;
7172
import org.elasticsearch.core.Nullable;
7273
import org.elasticsearch.core.Releasable;
@@ -1020,24 +1021,17 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
10201021
VersionValue versionValue = getVersionFromMap(op.uid());
10211022
if (versionValue == null) {
10221023
assert incrementIndexVersionLookup(); // used for asserting in tests
1023-
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
1024-
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
1025-
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
1026-
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
1027-
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(
1028-
searcher.getIndexReader(),
1029-
op.uid(),
1030-
op.id(),
1031-
loadSeqNo
1032-
);
1033-
} else {
1034-
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(
1035-
searcher.getIndexReader(),
1036-
op.uid(),
1037-
loadSeqNo
1038-
);
1024+
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = performActionWithDirectoryReader(
1025+
SearcherScope.INTERNAL,
1026+
directoryReader -> {
1027+
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
1028+
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
1029+
return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo);
1030+
} else {
1031+
return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo);
1032+
}
10391033
}
1040-
}
1034+
);
10411035
if (docIdAndVersion != null) {
10421036
versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
10431037
}
@@ -3470,4 +3464,26 @@ public LiveVersionMap getLiveVersionMap() {
34703464
protected long getPreCommitSegmentGeneration() {
34713465
return preCommitSegmentGeneration.get();
34723466
}
3467+
3468+
<T> T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction<DirectoryReader, T, IOException> action)
3469+
throws EngineException {
3470+
assert scope == SearcherScope.INTERNAL : "performActionWithDirectoryReader(...) isn't prepared for external usage";
3471+
assert store.hasReferences();
3472+
try {
3473+
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
3474+
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
3475+
try {
3476+
return action.apply(acquire);
3477+
} finally {
3478+
referenceManager.release(acquire);
3479+
}
3480+
} catch (AlreadyClosedException ex) {
3481+
throw ex;
3482+
} catch (Exception ex) {
3483+
maybeFailEngine("perform_action_directory_reader", ex);
3484+
ensureOpen(ex); // throw EngineCloseException here if we are already closed
3485+
logger.error("failed to perform action with directory reader", ex);
3486+
throw new EngineException(shardId, "failed to perform action with directory reader", ex);
3487+
}
3488+
}
34733489
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
7171
query,
7272
profile,
7373
tables,
74-
System.nanoTime()
74+
System.nanoTime(),
75+
false
7576
);
7677
}
7778

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
391391
query,
392392
false,
393393
TABLES,
394-
System.nanoTime()
394+
System.nanoTime(),
395+
false
395396
);
396397
}
397398

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,30 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
9595

9696
@Override
9797
protected EsqlQueryResponse run(EsqlQueryRequest request) {
98+
if (randomBoolean()) {
99+
request.allowPartialResults(randomBoolean());
100+
}
101+
Exception failure = null;
98102
try {
99-
return runWithBreaking(request);
100-
} catch (Exception e) {
101-
try (EsqlQueryResponse resp = super.run(request)) {
102-
assertThat(e, instanceOf(CircuitBreakingException.class));
103-
assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
104-
resp.incRef();
103+
final EsqlQueryResponse resp = runWithBreaking(request);
104+
if (resp.isPartial() == false) {
105105
return resp;
106106
}
107+
try (resp) {
108+
assertTrue(request.allowPartialResults());
109+
}
110+
} catch (Exception e) {
111+
failure = e;
112+
}
113+
// Re-run if the previous query failed or returned partial results
114+
// Only check the previous failure if the second query succeeded
115+
try (EsqlQueryResponse resp = super.run(request)) {
116+
if (failure != null) {
117+
assertThat(failure, instanceOf(CircuitBreakingException.class));
118+
assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
119+
}
120+
resp.incRef();
121+
return resp;
107122
}
108123
}
109124

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,38 +83,58 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
8383
logger.info("--> start disruption scheme [{}]", disruptionScheme);
8484
disruptionScheme.startDisrupting();
8585
logger.info("--> executing esql query with disruption {} ", request.query());
86+
if (randomBoolean()) {
87+
request.allowPartialResults(randomBoolean());
88+
}
8689
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
90+
EsqlQueryResponse resp = null;
8791
try {
88-
return future.actionGet(2, TimeUnit.MINUTES);
92+
resp = future.actionGet(2, TimeUnit.MINUTES);
93+
if (resp.isPartial() == false) {
94+
return resp;
95+
}
8996
} catch (Exception ignored) {
9097

9198
} finally {
9299
clearDisruption();
93100
}
94-
try {
95-
return future.actionGet(2, TimeUnit.MINUTES);
96-
} catch (Exception e) {
97-
logger.info(
98-
"running tasks: {}",
99-
client().admin()
100-
.cluster()
101-
.prepareListTasks()
102-
.get()
103-
.getTasks()
104-
.stream()
105-
.filter(
106-
// Skip the tasks we that'd get in the way while debugging
107-
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
108-
&& false == t.action().contains(HealthNode.TASK_NAME)
109-
)
110-
.toList()
111-
);
112-
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
113-
ensureBlocksReleased();
114-
logger.info("--> failed to execute esql query with disruption; retrying...", e);
115-
EsqlTestUtils.assertEsqlFailure(e);
116-
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
101+
// wait for the response after clear disruption
102+
if (resp == null) {
103+
try {
104+
resp = future.actionGet(2, TimeUnit.MINUTES);
105+
} catch (Exception e) {
106+
logger.info(
107+
"running tasks: {}",
108+
client().admin()
109+
.cluster()
110+
.prepareListTasks()
111+
.get()
112+
.getTasks()
113+
.stream()
114+
.filter(
115+
// Skip the tasks we that'd get in the way while debugging
116+
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
117+
&& false == t.action().contains(HealthNode.TASK_NAME)
118+
)
119+
.toList()
120+
);
121+
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
122+
ensureBlocksReleased();
123+
logger.info("--> failed to execute esql query with disruption; retrying...", e);
124+
EsqlTestUtils.assertEsqlFailure(e);
125+
}
126+
}
127+
// use the response if it's not partial
128+
if (resp != null) {
129+
if (resp.isPartial() == false) {
130+
return resp;
131+
}
132+
try (var ignored = resp) {
133+
assertTrue(request.allowPartialResults());
134+
}
117135
}
136+
// re-run the query
137+
return super.run(request);
118138
}
119139

120140
private ServiceDisruptionScheme addRandomDisruptionScheme() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
import org.elasticsearch.test.FailingFieldPlugin;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xcontent.json.JsonXContent;
19+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1920

20-
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.in;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2630

2731
/**
2832
* Make sure the failures on the data node come back as failures over the wire.
@@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
4852
return settings;
4953
}
5054

51-
/**
52-
* Use a runtime field that fails when loading field values to fail the entire query.
53-
*/
54-
public void testFailureLoadingFields() throws IOException {
55+
public Set<String> populateIndices() throws Exception {
5556
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
5657
mapping.startObject("runtime");
5758
{
@@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
6364
mapping.endObject();
6465
}
6566
mapping.endObject();
66-
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
67-
68-
int docCount = 50;
69-
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
70-
for (int d = 0; d < docCount; d++) {
71-
docs.add(client().prepareIndex("ok").setSource("foo", d));
67+
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
68+
int okCount = between(1, 50);
69+
Set<String> okIds = new HashSet<>();
70+
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
71+
for (int d = 0; d < okCount; d++) {
72+
String id = "ok-" + d;
73+
okIds.add(id);
74+
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
75+
}
76+
int failCount = between(1, 50);
77+
for (int d = 0; d < failCount; d++) {
78+
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
7279
}
73-
docs.add(client().prepareIndex("fail").setSource("foo", 0));
7480
indexRandom(true, docs);
81+
return okIds;
82+
}
7583

84+
/**
85+
* Use a runtime field that fails when loading field values to fail the entire query.
86+
*/
87+
public void testFailureLoadingFields() throws Exception {
88+
populateIndices();
7689
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
7790
assertThat(e.getMessage(), equalTo("Accessing failing field"));
7891
}
92+
93+
public void testPartialResults() throws Exception {
94+
Set<String> okIds = populateIndices();
95+
{
96+
EsqlQueryRequest request = new EsqlQueryRequest();
97+
request.query("FROM fail,ok | LIMIT 100");
98+
request.allowPartialResults(true);
99+
request.pragmas(randomPragmas());
100+
try (EsqlQueryResponse resp = run(request)) {
101+
assertTrue(resp.isPartial());
102+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
103+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
104+
}
105+
}
106+
{
107+
EsqlQueryRequest request = new EsqlQueryRequest();
108+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
109+
request.allowPartialResults(true);
110+
request.pragmas(randomPragmas());
111+
try (EsqlQueryResponse resp = run(request)) {
112+
assertTrue(resp.isPartial());
113+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
114+
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
115+
Set<String> actualIds = new HashSet<>();
116+
for (List<Object> row : rows) {
117+
assertThat(row.size(), equalTo(2));
118+
String id = (String) row.getFirst();
119+
assertThat(id, in(okIds));
120+
assertTrue(actualIds.add(id));
121+
}
122+
}
123+
}
124+
}
79125
}

0 commit comments

Comments
 (0)