diff --git a/docs/changelog/124832.yaml b/docs/changelog/124832.yaml new file mode 100644 index 0000000000000..dd471e265c612 --- /dev/null +++ b/docs/changelog/124832.yaml @@ -0,0 +1,6 @@ +pr: 124832 +summary: List/get query API +area: ES|QL +type: feature +issues: + - 124827 diff --git a/docs/reference/elasticsearch/security-privileges.md b/docs/reference/elasticsearch/security-privileges.md index 029fba380499c..15434a3336277 100644 --- a/docs/reference/elasticsearch/security-privileges.md +++ b/docs/reference/elasticsearch/security-privileges.md @@ -194,6 +194,9 @@ This section lists the privileges that you can assign to a role. `monitor_enrich` : All read-only operations related to managing and executing enrich policies. +`monitor_esql` +: All read-only operations related to ES|QL queries. + `monitor_inference` : All read-only operations related to {{infer}}. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.get_query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.get_query.json new file mode 100644 index 0000000000000..a0e78cf4b1b74 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.get_query.json @@ -0,0 +1,32 @@ +{ + "esql.get_query": { + "documentation": { + "url": null, + "description": "Executes a get ESQL query request" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_query/queries/{id}", + "methods": [ + "GET" + ], + "parts": { + "id": { + "type": "string", + "description": "The query ID" + } + } + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json new file mode 100644 index 0000000000000..472e70a8766e2 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json @@ -0,0 +1,26 @@ +{ + "esql.list_queries": { + "documentation": { + "url": null, + "description": "Executes a list ESQL queries request" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_query/queries", + "methods": [ + "GET" + ] + } + ] + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/IntOrLongMatcher.java b/test/framework/src/main/java/org/elasticsearch/test/IntOrLongMatcher.java new file mode 100644 index 0000000000000..f0b94fc4d07ab --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/IntOrLongMatcher.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.test; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isA; + +/** + * A type-agnostic way of comparing integer values, not caring if it's a long or an integer. + */ +public abstract sealed class IntOrLongMatcher extends BaseMatcher { + public static IntOrLongMatcher matches(int expected) { + return new IntMatcher(expected); + } + + public static IntOrLongMatcher matches(long expected) { + return new LongMatcher(expected); + } + + private static final class IntMatcher extends IntOrLongMatcher { + private final int expected; + + private IntMatcher(int expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + return switch (o) { + case Integer i -> expected == i; + case Long l -> expected == l; + default -> false; + }; + } + + @Override + public void describeTo(Description description) { + equalTo(expected).describeTo(description); + } + } + + private static final class LongMatcher extends IntOrLongMatcher { + private final long expected; + + LongMatcher(long expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + return switch (o) { + case Integer i -> expected == i; + case Long l -> expected == l; + default -> false; + }; + } + + @Override + public void describeTo(Description description) { + equalTo(expected).describeTo(description); + } + } + + public static Matcher isIntOrLong() { + return anyOf(isA(Integer.class), isA(Long.class)); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index 9a0d1a58a30a1..3577b1d834f8d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -196,6 +196,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion( public static final String APM_ORIGIN = "apm"; public static final String OTEL_ORIGIN = "otel"; public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream"; + public static final String ESQL_ORIGIN = "esql"; private ClientHelper() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 00d45fb135fb2..5ad40afa3c86e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -110,6 +110,7 @@ public class ClusterPrivilegeResolver { private static final Set MONITOR_WATCHER_PATTERN = Set.of("cluster:monitor/xpack/watcher/*"); private static final Set MONITOR_ROLLUP_PATTERN = Set.of("cluster:monitor/xpack/rollup/*"); private static final Set MONITOR_ENRICH_PATTERN = Set.of("cluster:monitor/xpack/enrich/*", "cluster:admin/xpack/enrich/get"); + private static final Set MONITOR_ESQL_PATTERN = Set.of("cluster:monitor/xpack/esql/*"); // intentionally cluster:monitor/stats* to match cluster:monitor/stats, cluster:monitor/stats[n] and cluster:monitor/stats/remote private static final Set MONITOR_STATS_PATTERN = Set.of("cluster:monitor/stats*"); @@ -249,6 +250,7 @@ public class ClusterPrivilegeResolver { public static final NamedClusterPrivilege MONITOR_WATCHER = new ActionClusterPrivilege("monitor_watcher", MONITOR_WATCHER_PATTERN); public static final NamedClusterPrivilege MONITOR_ROLLUP = new ActionClusterPrivilege("monitor_rollup", MONITOR_ROLLUP_PATTERN); public static final NamedClusterPrivilege MONITOR_ENRICH = new ActionClusterPrivilege("monitor_enrich", MONITOR_ENRICH_PATTERN); + public static final NamedClusterPrivilege MONITOR_ESQL = new ActionClusterPrivilege("monitor_esql", MONITOR_ESQL_PATTERN); public static final NamedClusterPrivilege MONITOR_STATS = new ActionClusterPrivilege("monitor_stats", MONITOR_STATS_PATTERN); public static final NamedClusterPrivilege MANAGE = new ActionClusterPrivilege("manage", ALL_CLUSTER_PATTERN, ALL_SECURITY_PATTERN); public static final NamedClusterPrivilege MANAGE_INFERENCE = new ActionClusterPrivilege("manage_inference", MANAGE_INFERENCE_PATTERN); @@ -431,6 +433,7 @@ public class ClusterPrivilegeResolver { MONITOR_WATCHER, MONITOR_ROLLUP, MONITOR_ENRICH, + MONITOR_ESQL, MONITOR_STATS, MANAGE, MANAGE_CONNECTOR, diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java index 91fdb9c39b6e3..5357f72c32710 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java @@ -172,6 +172,8 @@ public AsyncTaskManagementService( this.threadPool = threadPool; } + public static String ASYNC_ACTION_SUFFIX = "[a]"; + public void asyncExecute( Request request, TimeValue waitForCompletionTimeout, @@ -182,7 +184,7 @@ public void asyncExecute( String nodeId = clusterService.localNode().getId(); try (var ignored = threadPool.getThreadContext().newTraceContext()) { @SuppressWarnings("unchecked") - T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId)); + T searchTask = (T) taskManager.register("transport", action + ASYNC_ACTION_SUFFIX, new AsyncRequestWrapper(request, nodeId)); boolean operationStarted = false; try { operation.execute( diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java index a809bd50a45b8..8f4c1f3980272 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java @@ -42,6 +42,7 @@ import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class EsqlSecurityIT extends ESRestTestCase { @ClassRule @@ -69,6 +70,8 @@ public class EsqlSecurityIT extends ESRestTestCase { .user("logs_foo_after_2021", "x-pack-test-password", "logs_foo_after_2021", false) .user("logs_foo_after_2021_pattern", "x-pack-test-password", "logs_foo_after_2021_pattern", false) .user("logs_foo_after_2021_alias", "x-pack-test-password", "logs_foo_after_2021_alias", false) + .user("user_without_monitor_privileges", "x-pack-test-password", "user_without_monitor_privileges", false) + .user("user_with_monitor_privileges", "x-pack-test-password", "user_with_monitor_privileges", false) .build(); @Override @@ -309,7 +312,7 @@ public void testIndexPatternErrorMessageComparison_ESQL_SearchDSL() throws Excep json.endObject(); Request searchRequest = new Request("GET", "/index-user1,index-user2/_search"); searchRequest.setJsonEntity(Strings.toString(json)); - searchRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "metadata1_read2")); + setUser(searchRequest, "metadata1_read2"); // ES|QL query on the same index pattern var esqlResp = expectThrows(ResponseException.class, () -> runESQLCommand("metadata1_read2", "FROM index-user1,index-user2")); @@ -429,13 +432,13 @@ public void testFieldLevelSecurityAllow() throws Exception { public void testFieldLevelSecurityAllowPartial() throws Exception { Request request = new Request("GET", "/index*/_field_caps"); - request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "fls_user")); + setUser(request, "fls_user"); request.addParameter("error_trace", "true"); request.addParameter("pretty", "true"); request.addParameter("fields", "*"); request = new Request("GET", "/index*/_search"); - request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "fls_user")); + setUser(request, "fls_user"); request.addParameter("error_trace", "true"); request.addParameter("pretty", "true"); @@ -761,6 +764,36 @@ public void testFromLookupIndexForbidden() throws Exception { assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_BAD_REQUEST)); } + public void testListQueryAllowed() throws Exception { + Request request = new Request("GET", "_query/queries"); + setUser(request, "user_with_monitor_privileges"); + var resp = client().performRequest(request); + assertOK(resp); + } + + public void testListQueryForbidden() throws Exception { + Request request = new Request("GET", "_query/queries"); + setUser(request, "user_without_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]")); + } + + public void testGetQueryAllowed() throws Exception { + // This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges. + Request request = new Request("GET", "_query/queries/foo:1234"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404))); + } + + public void testGetQueryForbidden() throws Exception { + Request request = new Request("GET", "_query/queries/foo:1234"); + setUser(request, "user_without_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); + assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403)); + assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]")); + } + private void createEnrichPolicy() throws Exception { createIndex("songs", Settings.EMPTY, """ "properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} } @@ -837,11 +870,16 @@ protected Response runESQLCommand(String user, String command) throws IOExceptio json.endObject(); Request request = new Request("POST", "_query"); request.setJsonEntity(Strings.toString(json)); - request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", user)); + setUser(request, user); request.addParameter("error_trace", "true"); return client().performRequest(request); } + private static void setUser(Request request, String user) { + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", user)); + + } + static void addRandomPragmas(XContentBuilder builder) throws IOException { if (Build.current().isSnapshot()) { Settings pragmas = randomPragmas(); @@ -853,7 +891,7 @@ static void addRandomPragmas(XContentBuilder builder) throws IOException { } } - static Settings randomPragmas() { + private static Settings randomPragmas() { Settings.Builder settings = Settings.builder(); if (randomBoolean()) { settings.put("page_size", between(1, 5)); diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml b/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml index 745ae43cf640c..f66d0b72962c1 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml @@ -193,3 +193,10 @@ logs_foo_after_2021_alias: "@timestamp": {"gte": "2021-01-01T00:00:00"} } } + +user_without_monitor_privileges: + cluster: [] + +user_with_monitor_privileges: + cluster: + - monitor_esql diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index d0c7168218a12..ab44347cf1e02 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -40,7 +39,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.nio.charset.StandardCharsets; @@ -1395,15 +1393,11 @@ static Map removeAsyncProperties(Map map) { } protected static Map entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException { - try (InputStream content = entity.getContent()) { - XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue()); - assertEquals(expectedContentType, xContentType); - var map = XContentHelper.convertToMap(xContentType.xContent(), content, false); - if (shouldLog()) { - LOGGER.info("entity={}", map); - } - return map; + var result = EsqlTestUtils.entityToMap(entity, expectedContentType); + if (shouldLog()) { + LOGGER.info("entity={}", result); } + return result; } static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException { @@ -1535,21 +1529,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma } private static Request prepareRequest(Mode mode) { - Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")); - request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. - request.addParameter("pretty", "true"); // Improves error reporting readability - return request; + return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""))); } private static Request prepareAsyncGetRequest(String id) { - Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s"); - request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. - request.addParameter("pretty", "true"); // Improves error reporting readability - return request; + return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s")); } private static Request prepareAsyncDeleteRequest(String id) { - Request request = new Request("DELETE", "/_query/async/" + id); + return finishRequest(new Request("DELETE", "/_query/async/" + id)); + } + + private static Request finishRequest(Request request) { request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. request.addParameter("pretty", "true"); // Improves error reporting readability return request; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 1bce56bdf5171..6402745d36005 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql; +import org.apache.http.HttpEntity; import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; @@ -22,6 +23,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; @@ -41,6 +43,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; @@ -881,6 +884,18 @@ public static T singleValue(Collection collection) { return collection.iterator().next(); } + public static Map jsonEntityToMap(HttpEntity entity) throws IOException { + return entityToMap(entity, XContentType.JSON); + } + + public static Map entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException { + try (InputStream content = entity.getContent()) { + XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue()); + assertEquals(expectedContentType, xContentType); + return XContentHelper.convertToMap(xContentType.xContent(), content, false /* ordered */); + } + } + /** * Errors from remotes are wrapped in RemoteException while the ones from the local cluster * aren't. This utility method is useful for unwrapping in such cases. diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 41bac7f09df77..34b94207c5a8d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; +import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.hamcrest.core.IsEqual; @@ -136,7 +137,11 @@ public void testGetAsyncWhileQueryTaskIsBeingCancelled() throws Exception { .toList(); assertThat(tasks.size(), greaterThanOrEqualTo(1)); }); - client().admin().cluster().prepareCancelTasks().setActions(EsqlQueryAction.NAME + "[a]").get(); + client().admin() + .cluster() + .prepareCancelTasks() + .setActions(EsqlQueryAction.NAME + AsyncTaskManagementService.ASYNC_ACTION_SUFFIX) + .get(); assertBusy(() -> { List tasks = getEsqlQueryTasks().stream().filter(TaskInfo::cancelled).toList(); assertThat(tasks, not(empty())); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java new file mode 100644 index 0000000000000..7b337d4415cd6 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.IntOrLongMatcher; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import org.elasticsearch.xpack.esql.EsqlTestUtils; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.core.TimeValue.timeValueSeconds; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; + +public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { + private static final String QUERY = "from test | stats sum(pause_me)"; + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + public void testNoRunningQueries() throws Exception { + var request = new Request("GET", "/_query/queries"); + var response = getRestClient().performRequest(request); + assertThat(jsonEntityToMap(response.getEntity()), is(Map.of("queries", Map.of()))); + } + + public void testRunningQueries() throws Exception { + String id = null; + try (var initialResponse = sendAsyncQuery()) { + id = initialResponse.asyncExecutionId().get(); + + var getResultsRequest = new GetAsyncResultRequest(id); + getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1)); + client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); + Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries")); + @SuppressWarnings("unchecked") + var listResult = (Map>) EsqlTestUtils.singleValue( + jsonEntityToMap(listResponse.getEntity()).values() + ); + var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet())); + MapMatcher basicMatcher = MapMatcher.matchesMap() + .entry("node", is(taskId.getNodeId())) + .entry("id", IntOrLongMatcher.matches(taskId.getId())) + .entry("query", is(QUERY)) + .entry("start_time_millis", IntOrLongMatcher.isIntOrLong()) + .entry("running_time_nanos", IntOrLongMatcher.isIntOrLong()); + MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); + + Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId)); + MapMatcher.assertMap( + jsonEntityToMap(getQueryResponse.getEntity()), + basicMatcher.entry("coordinating_node", isA(String.class)) + .entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class)))) + ); + } finally { + if (id != null) { + // Finish the query. + scriptPermits.release(numberOfDocs()); + var getResultsRequest = new GetAsyncResultRequest(id); + getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60)); + client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); + } + scriptPermits.drainPermits(); + } + } + + private EsqlQueryResponse sendAsyncQuery() { + scriptPermits.drainPermits(); + scriptPermits.release(between(1, 5)); + return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 3e1a9775e785d..18fe50678040f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -954,7 +954,12 @@ public enum Cap { * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases * in some union type queries. C.f. https://github.com/elastic/elasticsearch/issues/125850 */ - FIX_DOUBLY_RELEASED_NULL_BLOCKS_IN_VALUESOURCEREADER; + FIX_DOUBLY_RELEASED_NULL_BLOCKS_IN_VALUESOURCEREADER, + + /** + * Listing queries and getting information on a specific query. + */ + QUERY_MONITORING; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryAction.java new file mode 100644 index 0000000000000..43942faa2e6d9 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryAction.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.xpack.esql.plugin.EsqlGetQueryResponse; + +public class EsqlGetQueryAction extends ActionType { + public static final EsqlGetQueryAction INSTANCE = new EsqlGetQueryAction(); + public static final String NAME = "cluster:monitor/xpack/esql/get_query"; + + private EsqlGetQueryAction() { + super(NAME); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java new file mode 100644 index 0000000000000..a0e5fc0c9443b --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; + +public class EsqlGetQueryRequest extends ActionRequest { + private final TaskId id; + + public EsqlGetQueryRequest(TaskId id) { + this.id = id; + } + + public TaskId id() { + return id; + } + + public EsqlGetQueryRequest(StreamInput streamInput) throws IOException { + super(streamInput); + id = TaskId.readFromStream(streamInput); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeWriteable(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesAction.java new file mode 100644 index 0000000000000..ba069d88f5b07 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesAction.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.xpack.esql.plugin.EsqlListQueriesResponse; + +public class EsqlListQueriesAction extends ActionType { + public static final EsqlListQueriesAction INSTANCE = new EsqlListQueriesAction(); + public static final String NAME = "cluster:monitor/xpack/esql/list_queries"; + + private EsqlListQueriesAction() { + super(NAME); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesRequest.java new file mode 100644 index 0000000000000..7ef639f4cd107 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesRequest.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class EsqlListQueriesRequest extends ActionRequest { + public EsqlListQueriesRequest() {} + + public EsqlListQueriesRequest(StreamInput streamInput) throws IOException { + super(streamInput); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java new file mode 100644 index 0000000000000..2076ae3b1aa55 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestEsqlListQueriesAction extends BaseRestHandler { + private static final Logger LOGGER = LogManager.getLogger(RestEsqlListQueriesAction.class); + + @Override + public String getName() { + return "esql_list_queries"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_query/queries/{id}"), new Route(GET, "/_query/queries")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + return restChannelConsumer(request, client); + } + + private static RestChannelConsumer restChannelConsumer(RestRequest request, NodeClient client) { + LOGGER.debug("Beginning execution of ESQL list queries."); + + String id = request.param("id"); + var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE; + var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest(); + + return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java new file mode 100644 index 0000000000000..1a4b6538d1a2a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject { + // This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task. + public record DetailedQuery( + TaskId id, + long startTimeMillis, + long runningTimeNanos, + String query, + String coordinatingNode, + List dataNodes + ) implements ToXContentObject { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("id", id.getId()); + builder.field("node", id.getNodeId()); + builder.field("start_time_millis", startTimeMillis); + builder.field("running_time_nanos", runningTimeNanos); + builder.field("query", query); + builder.field("coordinating_node", coordinatingNode); + builder.field("data_nodes", dataNodes); + builder.endObject(); + return builder; + } + } + + private final DetailedQuery query; + + public EsqlGetQueryResponse(DetailedQuery query) { + this.query = query; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new AssertionError("should not reach here"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return query.toXContent(builder, params); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java new file mode 100644 index 0000000000000..383dea8d82d93 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject { + private final List queries; + + public record Query(TaskId taskId, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(taskId.toString()); + builder.field("id", taskId.getId()); + builder.field("node", taskId.getNodeId()); + builder.field("start_time_millis", startTimeMillis); + builder.field("running_time_nanos", runningTimeNanos); + builder.field("query", query); + builder.endObject(); + return builder; + } + } + + public EsqlListQueriesResponse(List queries) { + this.queries = queries; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new AssertionError("should not reach here"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("queries"); + for (Query query : queries) { + query.toXContent(builder, params); + } + builder.endObject(); + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 4bc65e1b75b53..8ff2638137e5d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -52,6 +52,8 @@ import org.elasticsearch.xpack.esql.EsqlUsageTransportAction; import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction; import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction; +import org.elasticsearch.xpack.esql.action.EsqlGetQueryAction; +import org.elasticsearch.xpack.esql.action.EsqlListQueriesAction; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; @@ -59,6 +61,7 @@ import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction; import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction; import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction; +import org.elasticsearch.xpack.esql.action.RestEsqlListQueriesAction; import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction; import org.elasticsearch.xpack.esql.action.RestEsqlStopAsyncAction; import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator; @@ -227,7 +230,9 @@ public List getActions() { new ActionHandler(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class), new ActionHandler(EsqlResolveFieldsAction.TYPE, EsqlResolveFieldsAction.class), new ActionHandler(EsqlSearchShardsAction.TYPE, EsqlSearchShardsAction.class), - new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class) + new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class), + new ActionHandler(EsqlListQueriesAction.INSTANCE, TransportEsqlListQueriesAction.class), + new ActionHandler(EsqlGetQueryAction.INSTANCE, TransportEsqlGetQueryAction.class) ); } @@ -248,7 +253,8 @@ public List getRestHandlers( new RestEsqlAsyncQueryAction(), new RestEsqlGetAsyncResultAction(), new RestEsqlStopAsyncAction(), - new RestEsqlDeleteAsyncResultAction() + new RestEsqlDeleteAsyncResultAction(), + new RestEsqlListQueriesAction() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java new file mode 100644 index 0000000000000..70175931ea633 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.esql.action.EsqlGetQueryAction; +import org.elasticsearch.xpack.esql.action.EsqlGetQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryAction; + +import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN; + +public class TransportEsqlGetQueryAction extends HandledTransportAction { + private final NodeClient nodeClient; + + @Inject + public TransportEsqlGetQueryAction(TransportService transportService, NodeClient nodeClient, ActionFilters actionFilters) { + super(EsqlGetQueryAction.NAME, transportService, actionFilters, EsqlGetQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.nodeClient = nodeClient; + } + + @Override + protected void doExecute(Task task, EsqlGetQueryRequest request, ActionListener listener) { + ClientHelper.executeAsyncWithOrigin( + nodeClient, + ESQL_ORIGIN, + TransportGetTaskAction.TYPE, + new GetTaskRequest().setTaskId(request.id()), + new ActionListener<>() { + @Override + public void onResponse(GetTaskResponse response) { + TaskInfo task = response.getTask().getTask(); + if (task.action().startsWith(EsqlQueryAction.NAME) == false) { + listener.onFailure(new IllegalArgumentException("Task [" + request.id() + "] is not an ESQL query task")); + return; + } + ClientHelper.executeAsyncWithOrigin( + nodeClient, + ESQL_ORIGIN, + TransportListTasksAction.TYPE, + new ListTasksRequest().setDetailed(true) + .setActions(DriverTaskRunner.ACTION_NAME) + .setTargetParentTaskId(request.id()), + new ActionListener<>() { + @Override + public void onResponse(ListTasksResponse response) { + listener.onResponse(new EsqlGetQueryResponse(toDetailedQuery(task, response))); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + @Override + public void onFailure(Exception e) { + // The underlying root cause is meaningless to the user, but that is what will be shown, so we remove it. + var withoutCause = new Exception(e.getMessage()); + listener.onFailure(withoutCause); + } + } + ); + } + + private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo task, ListTasksResponse response) { + return new EsqlGetQueryResponse.DetailedQuery( + task.taskId(), + task.startTime(), + task.runningTimeNanos(), + task.description(), // Query + task.node(), // Coordinating node + response.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes + ); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlListQueriesAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlListQueriesAction.java new file mode 100644 index 0000000000000..2db7559a669b8 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlListQueriesAction.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.esql.action.EsqlListQueriesAction; +import org.elasticsearch.xpack.esql.action.EsqlListQueriesRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService; + +import java.util.List; + +import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN; + +public class TransportEsqlListQueriesAction extends HandledTransportAction { + private final NodeClient nodeClient; + + @Inject + public TransportEsqlListQueriesAction(TransportService transportService, NodeClient nodeClient, ActionFilters actionFilters) { + super( + EsqlListQueriesAction.NAME, + transportService, + actionFilters, + EsqlListQueriesRequest::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.nodeClient = nodeClient; + } + + @Override + protected void doExecute(Task task, EsqlListQueriesRequest request, ActionListener listener) { + ClientHelper.executeAsyncWithOrigin( + nodeClient, + ESQL_ORIGIN, + TransportListTasksAction.TYPE, + new ListTasksRequest().setActions(EsqlQueryAction.NAME, EsqlQueryAction.NAME + AsyncTaskManagementService.ASYNC_ACTION_SUFFIX) + .setDetailed(true), + new ActionListener<>() { + @Override + public void onResponse(ListTasksResponse response) { + List queries = response.getTasks() + .stream() + .map(TransportEsqlListQueriesAction::toQuery) + .toList(); + listener.onResponse(new EsqlListQueriesResponse(queries)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + private static EsqlListQueriesResponse.Query toQuery(TaskInfo taskInfo) { + return new EsqlListQueriesResponse.Query( + taskInfo.taskId(), + taskInfo.startTime(), + taskInfo.runningTimeNanos(), + taskInfo.description() + ); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 2b3a877b48205..2451242aa2afc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -380,7 +380,7 @@ public EsqlQueryTask createTask( id, type, action, - request.getDescription(), + request.query(), // Pass the query as the description parentTaskId, headers, originHeaders, diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 4c9a5bee5577e..c48e16cb570e5 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -386,6 +386,8 @@ public class Constants { "cluster:monitor/xpack/enrich/coordinator_stats", "cluster:monitor/xpack/enrich/stats", "cluster:monitor/xpack/eql/stats/dist", + "cluster:monitor/xpack/esql/get_query", + "cluster:monitor/xpack/esql/list_queries", "cluster:monitor/xpack/esql/stats/dist", "cluster:monitor/xpack/inference/post", "cluster:monitor/xpack/inference/get", diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index e7457d144fa9a..481c59aa25b60 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -32,6 +32,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.FLEET_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; @@ -164,6 +165,7 @@ public static void switchUserBasedOnActionOriginAndExecute( case ENT_SEARCH_ORIGIN: case CONNECTORS_ORIGIN: case INFERENCE_ORIGIN: + case ESQL_ORIGIN: case TASKS_ORIGIN: // TODO use a more limited user for tasks securityContext.executeAsInternalUser(InternalUsers.XPACK_USER, version, consumer); break; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/200_queries.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/200_queries.yml new file mode 100644 index 0000000000000..acc5f9e79f3cc --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/200_queries.yml @@ -0,0 +1,40 @@ +--- +setup: + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ query_monitoring ] + reason: "uses query monitoring" + - do: + indices.create: + index: test + body: + mappings: + properties: + message1: + type: keyword + +--- +# Since this feature requires queries in the background, the yaml tests only test edge cases with +# no running queries. The rest are covered by integration tests (See EsqlListQueriesActionIT). +List with no running queries: + - do: + esql.list_queries: { } + - match: { queries: { } } + +--- +Get with invalid task ID: + - do: + catch: /malformed task id foobar/ + esql.get_query: + id: "foobar" + +--- +Get with non-existent task ID: + - do: + catch: /task \[foobar:1234\] belongs to the node \[foobar\] which isn't part of the cluster and there is no record of the task/ + esql.get_query: + id: "foobar:1234" diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml index d4e795bf5b8cf..2e501ad016592 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml @@ -15,5 +15,5 @@ setup: # This is fragile - it needs to be updated every time we add a new cluster/index privilege # I would much prefer we could just check that specific entries are in the array, but we don't have # an assertion for that - - length: { "cluster" : 62 } + - length: { "cluster" : 63 } - length: { "index" : 24 }