Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137818.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137818
summary: ES|QL Views REST API
area: "ES|QL"
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"esql.delete_view": {
"documentation": {
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-esql-view-delete",
"description": "Delete a non-materialized VIEW for ESQL."
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [
"application/json"
],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/view/{name}",
"methods": [
"DELETE"
],
"parts": {
"name": {
"type": "string",
"description": "The name of the view to delete"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"esql.get_view": {
"documentation": {
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-esql-view-get",
"description": "Get a non-materialized VIEW for ESQL."
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [
"application/json"
],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/view/{name}",
"methods": [
"GET"
],
"parts": {
"name": {
"type": "list",
"description": "A comma-separated list of view names"
}
}
},
{
"path": "/_query/view",
"methods": [
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"esql.put_view": {
"documentation": {
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-esql-view-put",
"description": "Creates a non-materialized VIEW for ESQL."
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [
"application/json"
],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/view/{name}",
"methods": [
"PUT"
],
"parts": {
"name": {
"type": "string",
"description": "The name of the view to create or update"
}
}
}
]
},
"body": {
"description": "Use the `query` element to define the ES|QL query to use as a non-materialized VIEW.",
"required": true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9217000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_execution_metadata,9216000
esql_views,9217000
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ public class EsqlFeatures implements FeatureSpecification {
*/
public static final NodeFeature METRICS_SYNTAX = new NodeFeature("esql.metrics_syntax");

/**
* Cluster support for view management (create, get, update, list)
* This marks that the cluster state has support for the ViewMetadata CustomProject
*/
public static final NodeFeature ESQL_VIEWS = new NodeFeature("esql.views");

private Set<NodeFeature> snapshotBuildFeatures() {
assert Build.current().isSnapshot() : Build.current();
return Set.of(METRICS_SYNTAX);
return Set.of(METRICS_SYNTAX, ESQL_VIEWS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
Expand All @@ -74,6 +77,7 @@
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.io.stream.ExpressionQueryBuilder;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamWrapperQueryBuilder;
import org.elasticsearch.xpack.esql.plan.PlanWritables;
Expand All @@ -82,6 +86,18 @@
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.view.ClusterViewService;
import org.elasticsearch.xpack.esql.view.DeleteViewAction;
import org.elasticsearch.xpack.esql.view.GetViewAction;
import org.elasticsearch.xpack.esql.view.PutViewAction;
import org.elasticsearch.xpack.esql.view.RestDeleteViewAction;
import org.elasticsearch.xpack.esql.view.RestGetViewAction;
import org.elasticsearch.xpack.esql.view.RestPutViewAction;
import org.elasticsearch.xpack.esql.view.TransportDeleteViewAction;
import org.elasticsearch.xpack.esql.view.TransportGetViewAction;
import org.elasticsearch.xpack.esql.view.TransportPutViewAction;
import org.elasticsearch.xpack.esql.view.ViewMetadata;
import org.elasticsearch.xpack.esql.view.ViewService;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
Expand Down Expand Up @@ -182,6 +198,14 @@ public Collection<?> createComponents(PluginServices services) {
);
BigArrays bigArrays = services.indicesService().getBigArrays().withCircuitBreaking();
var blockFactoryProvider = blockFactoryProvider(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize);
EsqlFunctionRegistry functionRegistry = new EsqlFunctionRegistry();
ViewService viewService = new ClusterViewService(
functionRegistry,
services.clusterService(),
services.featureService(),
services.projectResolver(),
ViewService.ViewServiceConfig.fromSettings(settings)
);
setupSharedSecrets();
List<BiConsumer<LogicalPlan, Failures>> extraCheckers = extraCheckerProviders.stream()
.flatMap(p -> p.checkers(services.projectResolver(), services.clusterService()).stream())
Expand All @@ -201,7 +225,9 @@ public Collection<?> createComponents(PluginServices services) {
ThreadPool.Names.SEARCH,
blockFactoryProvider.blockFactory()
),
blockFactoryProvider
blockFactoryProvider,
functionRegistry,
viewService
);
}

Expand Down Expand Up @@ -264,7 +290,10 @@ public List<ActionHandler> getActions() {
new ActionHandler(EsqlSearchShardsAction.TYPE, EsqlSearchShardsAction.class),
new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class),
new ActionHandler(EsqlListQueriesAction.INSTANCE, TransportEsqlListQueriesAction.class),
new ActionHandler(EsqlGetQueryAction.INSTANCE, TransportEsqlGetQueryAction.class)
new ActionHandler(EsqlGetQueryAction.INSTANCE, TransportEsqlGetQueryAction.class),
new ActionHandler(PutViewAction.INSTANCE, TransportPutViewAction.class),
new ActionHandler(DeleteViewAction.INSTANCE, TransportDeleteViewAction.class),
new ActionHandler(GetViewAction.INSTANCE, TransportGetViewAction.class)
);
}

Expand All @@ -286,7 +315,10 @@ public List<RestHandler> getRestHandlers(
new RestEsqlGetAsyncResultAction(),
new RestEsqlStopAsyncAction(),
new RestEsqlDeleteAsyncResultAction(),
new RestEsqlListQueriesAction()
new RestEsqlListQueriesAction(),
new RestPutViewAction(),
new RestDeleteViewAction(),
new RestGetViewAction()
);
}

Expand Down Expand Up @@ -315,12 +347,22 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

entries.add(ExpressionQueryBuilder.ENTRY);
entries.add(PlanStreamWrapperQueryBuilder.ENTRY);
entries.add(ViewMetadata.ENTRY);

entries.addAll(ExpressionWritables.getNamedWriteables());
entries.addAll(PlanWritables.getNamedWriteables());
return entries;
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
namedXContent.add(
new NamedXContentRegistry.Entry(Metadata.ProjectCustom.class, new ParseField(ViewMetadata.TYPE), ViewMetadata::fromXContent)
);
return namedXContent;
}

public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
return List.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.view;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.plugin.EsqlFeatures;

import java.util.Map;
import java.util.function.Function;

/**
* Implementation of {@link ViewService} that keeps the views in the cluster state.
*/
public class ClusterViewService extends ViewService {
private final ClusterService clusterService;
private final FeatureService featureService;
private final ProjectResolver projectResolver;

public ClusterViewService(
EsqlFunctionRegistry functionRegistry,
ClusterService clusterService,
FeatureService featureService,
ProjectResolver projectResolver,
ViewServiceConfig config
) {
super(functionRegistry, config);
this.clusterService = clusterService;
this.featureService = featureService;
this.projectResolver = projectResolver;
}

@Override
protected ViewMetadata getMetadata() {
return getMetadata(clusterService.state());
}

protected ViewMetadata getMetadata(ClusterState clusterState) {
return getProjectMetadata(clusterState).custom(ViewMetadata.TYPE, ViewMetadata.EMPTY);
}

protected ProjectMetadata getProjectMetadata(ClusterState clusterState) {
return projectResolver.getProjectMetadata(clusterService.state());
}

@Override
protected void updateViewMetadata(ActionListener<Void> callback, Function<ViewMetadata, Map<String, View>> function) {
submitUnbatchedTask("update-esql-view-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
var project = getProjectMetadata(currentState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getProjectMetadata uses the ProjectResolver, but the ProjectResolver is not meant to be used inside cluster state update tasks, as the thread context isn't copied over when running cluster state update tasks (which is what the project resolver relies on in multi-project mode). It's totally understandable that you would implement it like this; when the multi-project work gets picked up, we should put effort into making it clearer that the project resolver isn't supposed to work like this.

That means that you'll have to explicitly pass a ProjectId to this method. I would advise resolving the project ID in the transport actions, because:

I think having this getMetadata() method that relies on the project resolver can cause confusion and possible bugs, because it's not apparent at all that this method only works if the project ID is present in the thread context, which isn't always the case, such as here. I would advise doing something along the lines of:

protected ViewMetadata getMetadata(ProjectId projectId) {
    return getMetadata(clusterService.state().metadata().getProject(projectId));
}

protected ViewMetadata getMetadata(ProjectMetadata projectMetadata) {
    return projectMetadata.custom(ViewMetadata.TYPE, ViewMetadata.EMPTY);
}

Let me know what you think and if you have any questions/concerns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally got this approach from the EnrichPolicyResolver, which uses ProjectResolver, but then also noticed that the EnrichPolicy CRUD used ProjectID instead, so it seems Enrich uses both approaches (but in different places, so probably correct). I can try switch to using ProjectID for CRUD as you suggest.

var views = project.custom(ViewMetadata.TYPE, ViewMetadata.EMPTY);
Map<String, View> policies = function.apply(views);
var metadata = ProjectMetadata.builder(project).putCustom(ViewMetadata.TYPE, new ViewMetadata(policies));
return ClusterState.builder(currentState).putProjectMetadata(metadata).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
callback.onResponse(null);
}

@Override
public void onFailure(Exception e) {
callback.onFailure(e);
}
});
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}
Comment on lines +96 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you were still planning on doing this, but it would be good to use a proper task queue here. I would recommend using a SimpleBatchedAckListenerTaskExecutor, as that avoids you having to implement a boilerplate executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the original prototype, and it would be good to fix this already now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, the enrich policy api also uses submitUnbatchedStateUpdateTask...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that code is definitely not recent either, hence the TODO in the code :)


@Override
protected void assertMasterNode() {
assert clusterService.localNode().isMasterNode();
}

@Override
protected boolean viewsFeatureEnabled() {
return featureService.clusterHasFeature(clusterService.state(), EsqlFeatures.ESQL_VIEWS);
}
}
Loading