Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122890.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122890
summary: Introduce `allow_partial_results` setting in ES|QL
area: ES|QL
type: enhancement
issues: []
25 changes: 25 additions & 0 deletions test/external-modules/failing-field/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-java-rest-test'


tasks.named('javaRestTest') {
usesDefaultDistribution()
it.onlyIf("snapshot build") { buildParams.isSnapshotBuild() }
}

dependencies {
api project(':test:framework')
}

esplugin {
description = 'A test module that includes runtime fields which throw exceptions when accessed'
classname ='org.elasticsearch.test.FailingFieldPlugin'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.test.failingfield;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class EsqlPartialResultsIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("test-failing-field")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("esql.query.allow_partial_results", "true")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public Set<String> populateIndices() throws Exception {
int nextId = 0;
{
createIndex("failing-index", Settings.EMPTY, """
{
"runtime": {
"fail_me": {
"type": "long",
"script": {
"source": "",
"lang": "failing_field"
}
}
},
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
Request doc = new Request("PUT", "failing-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}

}
Set<String> okIds = new HashSet<>();
{
createIndex("ok-index", Settings.EMPTY, """
{
"properties": {
"v": {
"type": "long"
}
}
}
""");
int numDocs = between(1, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(nextId++);
okIds.add(id);
Request doc = new Request("PUT", "ok-index/_doc/" + id);
doc.setJsonEntity("{\"v\": " + id + "}");
client().performRequest(doc);
}
}
refresh(client(), "failing-index,ok-index");
return okIds;
}

public void testPartialResult() throws Exception {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
}
""";
// allow_partial_results = true
{
Request request = new Request("POST", "/_query");
request.setJsonEntity(query);
if (randomBoolean()) {
request.addParameter("allow_partial_results", "true");
}
Response resp = client().performRequest(request);
Map<String, Object> results = entityAsMap(resp);
assertThat(results.get("is_partial"), equalTo(true));
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values, hasSize(okIds.size()));
}
// allow_partial_results = false
{
Request request = new Request("POST", "/_query");
request.setJsonEntity("""
{
"query": "FROM ok-index,failing-index | LIMIT 100"
}
""");
request.addParameter("allow_partial_results", "false");
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
Response resp = error.getResponse();
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public final ActionType<Response> action() {

public abstract EsqlQueryRequestBuilder<Request, Response> filter(QueryBuilder filter);

public abstract EsqlQueryRequestBuilder<Request, Response> allowPartialResults(boolean allowPartialResults);

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static class RequestObjectBuilder {
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;
private Boolean allPartialResults = null;

public RequestObjectBuilder() throws IOException {
this(randomFrom(XContentType.values()));
Expand Down Expand Up @@ -204,6 +205,11 @@ public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException>
return this;
}

public RequestObjectBuilder allPartialResults(boolean allPartialResults) {
this.allPartialResults = allPartialResults;
return this;
}

public RequestObjectBuilder build() throws IOException {
if (isBuilt == false) {
if (tables != null) {
Expand Down Expand Up @@ -1151,6 +1157,9 @@ static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mod
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);
if (requestObject.allPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allPartialResults));
}

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -122,4 +124,46 @@ public void testPartialResults() throws Exception {
}
}
}

public void testDefaultPartialResults() throws Exception {
Set<String> okIds = populateIndices();
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), true))
);
try {
// allow_partial_results = default
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
if (randomBoolean()) {
request.allowPartialResults(true);
}
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
// allow_partial_results = false
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
request.allowPartialResults(false);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run(request).close());
assertThat(e.getMessage(), equalTo("Accessing failing field"));
}
} finally {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS)
.setPersistentSettings(Settings.builder().putNull(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();
private boolean acceptedPragmaRisks = false;
private boolean allowPartialResults = false;
private Boolean allowPartialResults = null;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
Expand Down Expand Up @@ -232,12 +232,13 @@ public Map<String, Map<String, Column>> tables() {
return tables;
}

public boolean allowPartialResults() {
public Boolean allowPartialResults() {
return allowPartialResults;
}

public void allowPartialResults(boolean allowPartialResults) {
public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public EsqlQueryRequestBuilder keepOnCompletion(boolean keepOnCompletion) {
return this;
}

@Override
public EsqlQueryRequestBuilder allowPartialResults(boolean allowPartialResults) {
request.allowPartialResults(allowPartialResults);
return this;
}

static { // plumb access from x-pack core
SharedSecrets.setEsqlQueryRequestBuilderAccess(EsqlQueryRequestBuilder::newSyncEsqlQueryRequestBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ String fields() {
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");

private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
Expand Down Expand Up @@ -115,7 +114,6 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
}

private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
if (partialResults != null) {
esqlRequest.allowPartialResults(partialResults);
}
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
Setting.Property.Dynamic
);

public static final Setting<Boolean> QUERY_ALLOW_PARTIAL_RESULTS = Setting.boolSetting(
"esql.query.allow_partial_results",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

@Override
public Collection<?> createComponents(PluginServices services) {
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
Expand Down Expand Up @@ -151,7 +158,7 @@ protected XPackLicenseState getLicenseState() {
*/
@Override
public List<Setting<?>> getSettings() {
return List.of(QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, QUERY_RESULT_TRUNCATION_MAX_SIZE);
return List.of(QUERY_RESULT_TRUNCATION_DEFAULT_SIZE, QUERY_RESULT_TRUNCATION_MAX_SIZE, QUERY_ALLOW_PARTIAL_RESULTS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final RemoteClusterService remoteClusterService;
private final UsageService usageService;
private final TransportActionServices services;
private volatile boolean defaultAllowPartialResults;

@Inject
@SuppressWarnings("this-escape")
Expand Down Expand Up @@ -158,6 +159,9 @@ public TransportEsqlQueryAction(
indexNameExpressionResolver,
usageService
);
defaultAllowPartialResults = EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS, v -> defaultAllowPartialResults = v);
}

@Override
Expand Down Expand Up @@ -194,6 +198,9 @@ public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener
}

private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
if (request.allowPartialResults() == null) {
request.allowPartialResults(defaultAllowPartialResults);
}
Configuration configuration = new Configuration(
ZoneOffset.UTC,
request.locale() != null ? request.locale() : Locale.US,
Expand Down