From 63f07aa8e44e4bbe17328709dc372e7e5abe0933 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 21 May 2025 14:46:40 +0100 Subject: [PATCH 01/21] Enable And Disable Endpoint --- modules/streams/build.gradle | 21 ++++ .../streams/src/main/java/module-info.java | 19 +++ .../rest/streams/StreamsMetadata.java | 78 ++++++++++++ .../rest/streams/StreamsPlugin.java | 67 ++++++++++ .../LogsStreamsActivationToggleAction.java | 60 +++++++++ .../logs/RestSetLogStreamsEnabledAction.java | 53 ++++++++ .../TransportLogsStreamsToggleActivation.java | 119 ++++++++++++++++++ .../org/elasticsearch/TransportVersions.java | 2 +- .../SequentialTaskAckingTaskExecutor.java | 26 ++++ 9 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 modules/streams/build.gradle create mode 100644 modules/streams/src/main/java/module-info.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle new file mode 100644 index 0000000000000..e00a12ffb2951 --- /dev/null +++ b/modules/streams/build.gradle @@ -0,0 +1,21 @@ +apply plugin: 'elasticsearch.test-with-dependencies' +apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.internal-java-rest-test' +apply plugin: 'elasticsearch.yaml-rest-compat-test' + +esplugin { + description = 'The module adds support for the wired streams functionality including logs ingest' + classname = 'org.elasticsearch.rest.streams.StreamsPlugin' +} + +restResources { + restApi { + // TODO: Limit this down to just required API's for faster build. See https://github.com/elastic/elasticsearch/blob/fb3149cc664eb7061d55741a87e7e8cf29db4989/TESTING.asciidoc#L443 + include '*' + } +} + +dependencies { + testImplementation project(path: ':test:test-clusters') +} diff --git a/modules/streams/src/main/java/module-info.java b/modules/streams/src/main/java/module-info.java new file mode 100644 index 0000000000000..1a48a5dbc4fb8 --- /dev/null +++ b/modules/streams/src/main/java/module-info.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +module org.elasticsearch.rest.root { + requires org.elasticsearch.server; + requires org.elasticsearch.xcontent; + requires org.apache.lucene.core; + requires org.elasticsearch.base; + requires org.apache.logging.log4j; + + exports org.elasticsearch.rest.streams; + exports org.elasticsearch.rest.streams.logs; +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java new file mode 100644 index 0000000000000..128e0fb56eccf --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; + +/** + * Metadata for the Streams feature, which allows enabling or disabling logs for data streams. + * This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state. + */ +public class StreamsMetadata extends AbstractNamedDiffable implements Metadata.ProjectCustom { + + public static final String TYPE = "streams"; + public static final StreamsMetadata EMPTY = new StreamsMetadata(false); + + public boolean logsEnabled; + + public StreamsMetadata(StreamInput in) throws IOException { + logsEnabled = in.readBoolean(); + } + + public StreamsMetadata(boolean logsEnabled) { + this.logsEnabled = logsEnabled; + } + + public boolean isLogsEnabled() { + return logsEnabled; + } + + public void setLogsEnabled(boolean logsEnabled) { + this.logsEnabled = logsEnabled; + } + + @Override + public EnumSet context() { + return Metadata.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.STREAMS_LOGS_SUPPORT; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(logsEnabled); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled))); + } + +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java new file mode 100644 index 0000000000000..de62e3302911f --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; +import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction; +import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation; + +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Document here the Streams plugin. + */ +public class StreamsPlugin extends Plugin implements ActionPlugin { + + @Override + public List getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + if (DataStream.LOGS_STREAM_FEATURE_FLAG) { + return List.of(new RestSetLogStreamsEnabledAction()); + } + return Collections.emptyList(); + } + + @Override + public List getActions() { + return List.of(new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class)); + } + + @Override + public List getNamedWriteables() { + return List.of(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new)); + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java new file mode 100644 index 0000000000000..248e6bd7b90df --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; + +import java.io.IOException; + +public class LogsStreamsActivationToggleAction { + + public static ActionType INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle"); + + public static class Request extends AcknowledgedRequest { + + private final boolean enable; + + public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) { + super(masterNodeTimeout, ackTimeout); + this.enable = enable; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.enable = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(enable); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String toString() { + return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}'; + } + + public boolean shouldEnable() { + return enable; + } + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java new file mode 100644 index 0000000000000..7c10d73b3c565 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestSetLogStreamsEnabledAction extends BaseRestHandler { + @Override + public String getName() { + return "streams_logs_set_enabled_action"; + } + + // TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable + @Override + public List routes() { + return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final boolean enabled = request.path().endsWith("_enable"); + assert enabled || request.path().endsWith("_disable"); + return restChannel -> client.execute( + LogsStreamsActivationToggleAction.INSTANCE, + new LogsStreamsActivationToggleAction.Request( + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request), + enabled + ), + new RestToXContentListener<>(restChannel) + ); + } + +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java new file mode 100644 index 0000000000000..0709e5c87cc1c --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SequentialTaskAckingTaskExecutor; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.streams.StreamsMetadata; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Locale; + +public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportLogsStreamsToggleActivation.class); + + private final ProjectResolver projectResolver; + private final MasterServiceTaskQueue taskQueue; + + @Inject + public TransportLogsStreamsToggleActivation( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver + ) { + super( + actionName, + transportService, + clusterService, + threadPool, + actionFilters, + LogsStreamsActivationToggleAction.Request::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.taskQueue = clusterService.createTaskQueue( + "streams-update-state-queue", + Priority.NORMAL, + new SequentialTaskAckingTaskExecutor<>() + ); + this.projectResolver = projectResolver; + } + + @Override + protected void masterOperation( + Task task, + LogsStreamsActivationToggleAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + ProjectId projectId = projectResolver.getProjectId(); + StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + boolean currentlyEnabled = streamsState.isLogsEnabled(); + boolean shouldEnable = request.shouldEnable(); + if (shouldEnable != currentlyEnabled) { + StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable); + String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable"); + taskQueue.submitTask(taskName, updateTask, updateTask.timeout()); + } else { + logger.debug("Logs streams are already in the requested state: {}", shouldEnable); + listener.onResponse(AcknowledgedResponse.TRUE); + } + } + + @Override + protected ClusterBlockException checkBlock(LogsStreamsActivationToggleAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask { + private final ProjectId projectId; + private final boolean enabled; + + StreamsMetadataUpdateTask( + AcknowledgedRequest request, + ActionListener listener, + ProjectId projectId, + boolean enabled + ) { + super(request, listener); + this.projectId = projectId; + this.enabled = enabled; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return currentState.copyAndUpdateProject( + projectId, + builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled)) + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 8bf8a94fccfe0..848215573bc03 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -298,7 +298,7 @@ static TransportVersion def(int id) { public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_096_0_00); public static final TransportVersion NONE_CHUNKING_STRATEGY = def(9_097_0_00); public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00); - + public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_099_0_00); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java new file mode 100644 index 0000000000000..7eee6d4c7a5bc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.core.Tuple; + +/** + * A task executor that executes tasks sequentially, allowing each task to acknowledge the cluster state update. + * This executor is used for tasks that need to be executed one after another, where each task can produce a new + * cluster state and can listen for acknowledgments. + * + * @param The type of the task that extends {@link AckedClusterStateUpdateTask}. + */ +public class SequentialTaskAckingTaskExecutor extends SimpleBatchedAckListenerTaskExecutor { + @Override + public Tuple executeTask(Task task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), task); + } +} From 2fa713febdd01dbe9c9719b28ea22bac76a732ad Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 10 Jun 2025 17:05:45 +0100 Subject: [PATCH 02/21] Status Endpoint --- .../rest/streams/StreamsPlugin.java | 10 ++- .../logs/RestSetLogStreamsEnabledAction.java | 1 - .../streams/logs/RestStreamsStatusAction.java | 48 ++++++++++++ .../streams/logs/StreamsStatusAction.java | 78 +++++++++++++++++++ .../TransportLogsStreamsToggleActivation.java | 3 +- .../logs/TransportStreamsStatusAction.java | 70 +++++++++++++++++ 6 files changed, 205 insertions(+), 5 deletions(-) create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java create mode 100644 modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java index de62e3302911f..8caca422e24d8 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -25,7 +25,10 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction; +import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction; +import org.elasticsearch.rest.streams.logs.StreamsStatusAction; import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation; +import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction; import java.util.Collections; import java.util.List; @@ -50,14 +53,17 @@ public List getRestHandlers( Predicate clusterSupportsFeature ) { if (DataStream.LOGS_STREAM_FEATURE_FLAG) { - return List.of(new RestSetLogStreamsEnabledAction()); + return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction()); } return Collections.emptyList(); } @Override public List getActions() { - return List.of(new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class)); + return List.of( + new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class), + new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class) + ); } @Override diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java index 7c10d73b3c565..bde037b4bde50 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -29,7 +29,6 @@ public String getName() { return "streams_logs_set_enabled_action"; } - // TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable @Override public List routes() { return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable")); diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java new file mode 100644 index 0000000000000..24c430a379c9f --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.PUBLIC) +public class RestStreamsStatusAction extends BaseRestHandler { + + @Override + public String getName() { + return "streams_status_action"; + } + + @Override + public List routes() { + return List.of(new RestHandler.Route(GET, "/_streams/_status")); + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + return restChannel -> client.execute( + StreamsStatusAction.INSTANCE, + new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)), + new RestToXContentListener<>(restChannel) + ); + } + +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java new file mode 100644 index 0000000000000..efd4f6591e1a9 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +public class StreamsStatusAction { + + public static ActionType INSTANCE = new ActionType<>("cluster:admin/streams/status"); + + public static class Request extends MasterNodeReadRequest { + + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final boolean logs_enabled; + + public Response(boolean logsEnabled) { + logs_enabled = logsEnabled; + } + + public Response(StreamInput in) throws IOException { + logs_enabled = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(logs_enabled); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + builder.startObject("logs"); + builder.field("enabled", logs_enabled); + builder.endObject(); + + builder.endObject(); + return builder; + } + } +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index 0709e5c87cc1c..0467832341a98 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -43,7 +43,6 @@ public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportM @Inject public TransportLogsStreamsToggleActivation( - String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, @@ -51,7 +50,7 @@ public TransportLogsStreamsToggleActivation( ProjectResolver projectResolver ) { super( - actionName, + LogsStreamsActivationToggleAction.INSTANCE.name(), transportService, clusterService, threadPool, diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java new file mode 100644 index 0000000000000..445723bc9d291 --- /dev/null +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams.logs; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.streams.StreamsMetadata; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public class TransportStreamsStatusAction extends TransportMasterNodeReadAction { + + private final ProjectResolver projectResolver; + + @Inject + public TransportStreamsStatusAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver + ) { + super( + StreamsStatusAction.INSTANCE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + StreamsStatusAction.Request::new, + StreamsStatusAction.Response::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.projectResolver = projectResolver; + } + + @Override + protected void masterOperation( + Task task, + StreamsStatusAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + ProjectId projectId = projectResolver.getProjectId(); + StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + boolean logsEnabled = streamsState.isLogsEnabled(); + StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled); + listener.onResponse(response); + } + + @Override + protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} From 5dc679cb0df8f18ec30d87b65b8334dfbcc6917b Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 12 Jun 2025 09:19:10 +0100 Subject: [PATCH 03/21] Integration Tests --- .../rest/streams/TestToggleIT.java | 61 +++++++++++++++++++ .../rest/streams/StreamsMetadata.java | 25 ++++++-- .../rest/streams/StreamsPlugin.java | 6 +- 3 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java diff --git a/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java new file mode 100644 index 0000000000000..c6f00c855b05e --- /dev/null +++ b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest.streams; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.is; + +public class TestToggleIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class); + } + + public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException { + boolean[] testParams = new boolean[] { true, false, true }; + for (boolean enable : testParams) { + doLogStreamToggleTest(enable); + } + } + + private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException { + LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + enable + ); + + AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get(); + ElasticsearchAssertions.assertAcked(acknowledgedResponse); + + ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get(); + ProjectMetadata projectMetadata = clusterStateResponse.getState().metadata().getProject(ProjectId.DEFAULT); + + assertThat(projectMetadata.custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable)); + } + +} diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java index 128e0fb56eccf..311911a05cdc4 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java @@ -12,6 +12,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -22,6 +23,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.Iterator; +import java.util.Objects; /** * Metadata for the Streams feature, which allows enabling or disabling logs for data streams. @@ -46,10 +48,6 @@ public boolean isLogsEnabled() { return logsEnabled; } - public void setLogsEnabled(boolean logsEnabled) { - this.logsEnabled = logsEnabled; - } - @Override public EnumSet context() { return Metadata.ALL_CONTEXTS; @@ -65,6 +63,12 @@ public TransportVersion getMinimalSupportedVersion() { return TransportVersions.STREAMS_LOGS_SUPPORT; } + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in); + } + + + @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(logsEnabled); @@ -75,4 +79,17 @@ public Iterator toXContentChunked(ToXContent.Params params return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled))); } + @Override + public boolean equals(Object o) { + if ((o instanceof StreamsMetadata that)) { + return logsEnabled == that.logsEnabled; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hashCode(logsEnabled); + } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java index 8caca422e24d8..396669c7ce5d2 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.streams; +import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -68,6 +69,9 @@ public List getActions() { @Override public List getNamedWriteables() { - return List.of(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new)); + return List.of( + new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new), + new NamedWriteableRegistry.Entry(NamedDiff.class, StreamsMetadata.TYPE, StreamsMetadata::readDiffFrom) + ); } } From 54d57304adc8cc463c0863d09486442930f08caa Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 12 Jun 2025 13:06:08 +0100 Subject: [PATCH 04/21] REST Spec --- .../streams/logs/RestStreamsStatusAction.java | 2 +- .../api/streams.logs.disable.json | 27 ++++++++++++++ .../api/streams.logs.enable.json | 27 ++++++++++++++ .../rest-api-spec/api/streams.status.json | 35 +++++++++++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java index 24c430a379c9f..d8983060d7f5f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -33,7 +33,7 @@ public String getName() { @Override public List routes() { - return List.of(new RestHandler.Route(GET, "/_streams/_status")); + return List.of(new RestHandler.Route(GET, "/_streams/status")); } @Override diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json new file mode 100644 index 0000000000000..77414c55c9329 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json @@ -0,0 +1,27 @@ +{ + "streams.logs.disable": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-disable.html", + "description": "Disable the Logs Streams feature for this cluster" + }, + "stability": "stable", + "visibility": "public", + "headers": { + "accept": [ + "application/json", + "text/plain" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/logs/_disable", + "methods": [ + "GET" + ] + } + ] + }, + "params": {} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json new file mode 100644 index 0000000000000..a191095b2c13b --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json @@ -0,0 +1,27 @@ +{ + "streams.logs.enable": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-enable.html", + "description": "Enable the Logs Streams feature for this cluster" + }, + "stability": "stable", + "visibility": "public", + "headers": { + "accept": [ + "application/json", + "text/plain" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/logs/_enable", + "methods": [ + "GET" + ] + } + ] + }, + "params": {} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json new file mode 100644 index 0000000000000..50e03c8ebfb01 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -0,0 +1,35 @@ +{ + "streams.status": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-status.html", + "description": "Return the current status of the streams feature for each streams type" + }, + "stability": "stable", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_streams/status", + "methods": [ + "GET" + ] + } + ] + }, + "params": { + "local": { + "type": "boolean", + "description": "Return local information, do not retrieve the state from master node (default: false)" + }, + "master_timeout": { + "type": "time", + "description": "Specify timeout for connection to master" + } + } + } +} From 7b7d52a77c5a3cef14920a910b721cc2b380ce11 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 12 Jun 2025 13:56:06 +0100 Subject: [PATCH 05/21] REST Spec tests --- modules/streams/build.gradle | 12 ++++++ .../streams/StreamsYamlTestSuiteIT.java | 37 ++++++++++++++++ .../test/streams/logs/10_basic.yml | 43 +++++++++++++++++++ ...disable.json => streams.logs_disable.json} | 2 +- ...s.enable.json => streams.logs_enable.json} | 2 +- 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java create mode 100644 modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml rename rest-api-spec/src/main/resources/rest-api-spec/api/{streams.logs.disable.json => streams.logs_disable.json} (95%) rename rest-api-spec/src/main/resources/rest-api-spec/api/{streams.logs.enable.json => streams.logs_enable.json} (95%) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index e00a12ffb2951..0798c2199ebd7 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -16,6 +16,18 @@ restResources { } } +configurations { + basicRestSpecs { + attributes { + attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE) + } + } +} + +artifacts { + basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test")) +} + dependencies { testImplementation project(path: ':test:test-clusters') } diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java new file mode 100644 index 0000000000000..7a6e4665e9e63 --- /dev/null +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.streams; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; + +public class StreamsYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + public StreamsYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml new file mode 100644 index 0000000000000..3c1bc7d6c53c1 --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml @@ -0,0 +1,43 @@ +--- +"Basic toggle of logs state enable to disable and back": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + streams.logs_disable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_false: logs.enabled + + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + +--- +"Check for repeated toggle to same state": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json similarity index 95% rename from rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json rename to rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json index 77414c55c9329..bc61688f9cfa2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.disable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json @@ -1,5 +1,5 @@ { - "streams.logs.disable": { + "streams.logs_disable": { "documentation": { "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-disable.html", "description": "Disable the Logs Streams feature for this cluster" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json similarity index 95% rename from rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json rename to rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json index a191095b2c13b..49b7f008d9ab7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs.enable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json @@ -1,5 +1,5 @@ { - "streams.logs.enable": { + "streams.logs_enable": { "documentation": { "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/streams-logs-enable.html", "description": "Enable the Logs Streams feature for this cluster" From 6ed2fdd74f046c8e805c3c117fd4973eb26a172d Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 12 Jun 2025 15:02:13 +0100 Subject: [PATCH 06/21] Some documentation --- .../java/org/elasticsearch/rest/streams/StreamsMetadata.java | 2 -- .../java/org/elasticsearch/rest/streams/StreamsPlugin.java | 4 +++- .../streams/logs/TransportLogsStreamsToggleActivation.java | 3 +++ .../rest/streams/logs/TransportStreamsStatusAction.java | 4 ++++ 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java index 311911a05cdc4..529dd97673ed3 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java @@ -67,8 +67,6 @@ public static NamedDiff readDiffFrom(StreamInput in) thr return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in); } - - @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(logsEnabled); diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java index 396669c7ce5d2..acf807c42873f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -37,7 +37,9 @@ import java.util.function.Supplier; /** - * Document here the Streams plugin. + * This plugin provides the Streams feature which builds upon data streams to + * provide the user with a more "batteries included" experience for ingesting large + * streams of data, such as logs. */ public class StreamsPlugin extends Plugin implements ActionPlugin { diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index 0467832341a98..c2a01ceeff948 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -34,6 +34,9 @@ import java.util.Locale; +/** + * Transport action to toggle the activation state of logs streams in a project / cluster. + */ public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportLogsStreamsToggleActivation.class); diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java index 445723bc9d291..e5a148c222174 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java @@ -24,6 +24,10 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +/** + * Transport action to retrieve the status of logs streams in a project / cluster. + * Results are broken down by stream type. Currently only logs streams are implemented. + */ public class TransportStreamsStatusAction extends TransportMasterNodeReadAction { private final ProjectResolver projectResolver; From 130f5b8089409fa5c19d4d996cc2170900a12d81 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 16 Jun 2025 13:08:20 +0100 Subject: [PATCH 07/21] Update docs/changelog/129474.yaml --- docs/changelog/129474.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/129474.yaml diff --git a/docs/changelog/129474.yaml b/docs/changelog/129474.yaml new file mode 100644 index 0000000000000..d3da30aea942c --- /dev/null +++ b/docs/changelog/129474.yaml @@ -0,0 +1,5 @@ +pr: 129474 +summary: "Streams - Log's Enable, Disable and Status endpoints" +area: Data streams +type: feature +issues: [] From 2b7ad57c367a6a1a0d1d64b6c86a84077a467a07 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 16 Jun 2025 15:05:47 +0100 Subject: [PATCH 08/21] Fix failing security test --- .../org/elasticsearch/xpack/security/operator/Constants.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 891a0badfd741..72f50b070dca1 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -72,6 +72,8 @@ public class Constants { "cluster:admin/script_language/get", "cluster:admin/scripts/painless/context", "cluster:admin/scripts/painless/execute", + "cluster:admin/streams/logs/toggle", + "cluster:admin/streams/status", "cluster:admin/synonyms/delete", "cluster:admin/synonyms/get", "cluster:admin/synonyms/put", From e601c4d586780dc64be48551b1b1ea1d4882020a Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 10:08:23 +0100 Subject: [PATCH 09/21] PR Fixes --- .../rest/streams/logs/RestSetLogStreamsEnabledAction.java | 4 ++-- .../resources/rest-api-spec/api/streams.logs_disable.json | 4 ++-- .../main/resources/rest-api-spec/api/streams.logs_enable.json | 4 ++-- .../src/main/resources/rest-api-spec/api/streams.status.json | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java index bde037b4bde50..57e034a2955c0 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.List; -import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; @ServerlessScope(Scope.PUBLIC) public class RestSetLogStreamsEnabledAction extends BaseRestHandler { @@ -31,7 +31,7 @@ public String getName() { @Override public List routes() { - return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable")); + return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable")); } @Override diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json index bc61688f9cfa2..f79d589d1078b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json @@ -5,7 +5,7 @@ "description": "Disable the Logs Streams feature for this cluster" }, "stability": "stable", - "visibility": "public", + "visibility": "feature_flag", "headers": { "accept": [ "application/json", @@ -17,7 +17,7 @@ { "path": "/_streams/logs/_disable", "methods": [ - "GET" + "POST" ] } ] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json index 49b7f008d9ab7..628cdc3ffd4b1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json @@ -5,7 +5,7 @@ "description": "Enable the Logs Streams feature for this cluster" }, "stability": "stable", - "visibility": "public", + "visibility": "feature_flag", "headers": { "accept": [ "application/json", @@ -17,7 +17,7 @@ { "path": "/_streams/logs/_enable", "methods": [ - "GET" + "POST" ] } ] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json index 50e03c8ebfb01..5cac8982bdadd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -5,7 +5,7 @@ "description": "Return the current status of the streams feature for each streams type" }, "stability": "stable", - "visibility": "public", + "visibility": "feature_flag", "headers": { "accept": [ "application/json" From 323caca139ae27d1043c4a67a165869c5ab73dd9 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 10:24:04 +0100 Subject: [PATCH 10/21] PR Fixes - Add missing feature flag name to YAML spec --- .../main/resources/rest-api-spec/api/streams.logs_disable.json | 1 + .../main/resources/rest-api-spec/api/streams.logs_enable.json | 1 + .../src/main/resources/rest-api-spec/api/streams.status.json | 1 + 3 files changed, 3 insertions(+) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json index f79d589d1078b..aab77623da88f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json @@ -6,6 +6,7 @@ }, "stability": "stable", "visibility": "feature_flag", + "feature_flag": "logs_stream", "headers": { "accept": [ "application/json", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json index 628cdc3ffd4b1..40936797df49d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json @@ -6,6 +6,7 @@ }, "stability": "stable", "visibility": "feature_flag", + "feature_flag": "logs_stream", "headers": { "accept": [ "application/json", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json index 5cac8982bdadd..baefa165c1ac4 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -6,6 +6,7 @@ }, "stability": "stable", "visibility": "feature_flag", + "feature_flag": "logs_stream", "headers": { "accept": [ "application/json" From c65a5fe1f73deba4baebbf05d1aca3d264b78049 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 10:55:28 +0100 Subject: [PATCH 11/21] PR Fixes - Fix support for timeout and master_timeout parameters --- .../streams/logs/RestSetLogStreamsEnabledAction.java | 9 +++++++-- .../rest/streams/logs/RestStreamsStatusAction.java | 9 +++++++-- .../rest-api-spec/api/streams.logs_disable.json | 11 ++++++++++- .../rest-api-spec/api/streams.logs_enable.json | 11 ++++++++++- .../resources/rest-api-spec/api/streams.status.json | 6 +----- 5 files changed, 35 insertions(+), 11 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java index 57e034a2955c0..b740c073f1811 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -17,8 +17,8 @@ import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; -import java.io.IOException; import java.util.List; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -35,7 +35,7 @@ public List routes() { } @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { final boolean enabled = request.path().endsWith("_enable"); assert enabled || request.path().endsWith("_disable"); return restChannel -> client.execute( @@ -49,4 +49,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); } + @Override + public Set supportedQueryParameters() { + return Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM); + } + } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java index d8983060d7f5f..42aab9dd711fa 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -18,8 +18,9 @@ import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; -import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -37,7 +38,7 @@ public List routes() { } @Override - protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { return restChannel -> client.execute( StreamsStatusAction.INSTANCE, new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)), @@ -45,4 +46,8 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request ); } + @Override + public Set supportedQueryParameters() { + return Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json index aab77623da88f..20f0e0c2feba6 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_disable.json @@ -23,6 +23,15 @@ } ] }, - "params": {} + "params": { + "timeout": { + "type": "time", + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." + }, + "master_timeout": { + "type": "time", + "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." + } + } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json index 40936797df49d..adaf544d9b604 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.logs_enable.json @@ -23,6 +23,15 @@ } ] }, - "params": {} + "params": { + "timeout": { + "type": "time", + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." + }, + "master_timeout": { + "type": "time", + "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." + } + } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json index baefa165c1ac4..784064249ecc7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -23,13 +23,9 @@ ] }, "params": { - "local": { - "type": "boolean", - "description": "Return local information, do not retrieve the state from master node (default: false)" - }, "master_timeout": { "type": "time", - "description": "Specify timeout for connection to master" + "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." } } } From 8fa2ccd6274d53abbce6ccac7afd1975b7570935 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 11:41:37 +0100 Subject: [PATCH 12/21] PR Fixes - Make the REST handler validation happy with the new params --- .../logs/RestSetLogStreamsEnabledAction.java | 18 ++++++++++++------ .../streams/logs/RestStreamsStatusAction.java | 11 +++++------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java index b740c073f1811..2de2a3a21b9f8 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -24,6 +24,9 @@ @ServerlessScope(Scope.PUBLIC) public class RestSetLogStreamsEnabledAction extends BaseRestHandler { + + public static final Set SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM); + @Override public String getName() { return "streams_logs_set_enabled_action"; @@ -38,20 +41,23 @@ public List routes() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { final boolean enabled = request.path().endsWith("_enable"); assert enabled || request.path().endsWith("_disable"); + + LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request( + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request), + enabled + ); + return restChannel -> client.execute( LogsStreamsActivationToggleAction.INSTANCE, - new LogsStreamsActivationToggleAction.Request( - RestUtils.getMasterNodeTimeout(request), - RestUtils.getAckTimeout(request), - enabled - ), + activationRequest, new RestToXContentListener<>(restChannel) ); } @Override public Set supportedQueryParameters() { - return Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM); + return SUPPORTED_PARAMS; } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java index 42aab9dd711fa..f9b99de18b33f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -27,6 +27,8 @@ @ServerlessScope(Scope.PUBLIC) public class RestStreamsStatusAction extends BaseRestHandler { + public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); + @Override public String getName() { return "streams_status_action"; @@ -39,15 +41,12 @@ public List routes() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - return restChannel -> client.execute( - StreamsStatusAction.INSTANCE, - new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)), - new RestToXContentListener<>(restChannel) - ); + StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)); + return restChannel -> client.execute(StreamsStatusAction.INSTANCE, statusRequest, new RestToXContentListener<>(restChannel)); } @Override public Set supportedQueryParameters() { - return Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); + return SUPPORTED_PARAMS; } } From 3f244d3b6d802dfa5d8f60d655c8f58fc1f2b764 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 14:05:20 +0100 Subject: [PATCH 13/21] Delete docs/changelog/129474.yaml --- docs/changelog/129474.yaml | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 docs/changelog/129474.yaml diff --git a/docs/changelog/129474.yaml b/docs/changelog/129474.yaml deleted file mode 100644 index d3da30aea942c..0000000000000 --- a/docs/changelog/129474.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 129474 -summary: "Streams - Log's Enable, Disable and Status endpoints" -area: Data streams -type: feature -issues: [] From 6d1b6548bba5dfdf8e74a4fa693a2197f37cd412 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 15:25:07 +0100 Subject: [PATCH 14/21] PR Fixes - Switch to local metadata action type and improve request handling --- modules/streams/build.gradle | 12 +++++- .../streams/logs/RestStreamsStatusAction.java | 11 +++-- .../streams/logs/StreamsStatusAction.java | 27 +++++------- .../logs/TransportStreamsStatusAction.java | 41 ++++++++----------- .../rest-api-spec/api/streams.status.json | 4 +- .../org/elasticsearch/TransportVersions.java | 1 + .../build.gradle | 10 +++++ 7 files changed, 59 insertions(+), 47 deletions(-) diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index 0798c2199ebd7..fd56a627026b6 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -1,3 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + apply plugin: 'elasticsearch.test-with-dependencies' apply plugin: 'elasticsearch.internal-cluster-test' apply plugin: 'elasticsearch.internal-yaml-rest-test' @@ -11,8 +20,7 @@ esplugin { restResources { restApi { - // TODO: Limit this down to just required API's for faster build. See https://github.com/elastic/elasticsearch/blob/fb3149cc664eb7061d55741a87e7e8cf29db4989/TESTING.asciidoc#L443 - include '*' + include '_common', 'streams' } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java index f9b99de18b33f..65adec761689c 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import java.util.Collections; @@ -27,7 +28,7 @@ @ServerlessScope(Scope.PUBLIC) public class RestStreamsStatusAction extends BaseRestHandler { - public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); + public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_TIMEOUT_PARAM); @Override public String getName() { @@ -41,8 +42,12 @@ public List routes() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)); - return restChannel -> client.execute(StreamsStatusAction.INSTANCE, statusRequest, new RestToXContentListener<>(restChannel)); + StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getAckTimeout(request)); + return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + StreamsStatusAction.INSTANCE, + statusRequest, + new RestToXContentListener<>(restChannel) + ); } @Override diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java index efd4f6591e1a9..76f87ad25b47b 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java @@ -9,40 +9,33 @@ package org.elasticsearch.rest.streams.logs; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Map; public class StreamsStatusAction { public static ActionType INSTANCE = new ActionType<>("cluster:admin/streams/status"); - public static class Request extends MasterNodeReadRequest { - - public Request(TimeValue masterNodeTimeout) { - super(masterNodeTimeout); - } - - public Request(StreamInput in) throws IOException { - super(in); + public static class Request extends LocalClusterStateRequest { + protected Request(TimeValue masterTimeout) { + super(masterTimeout); } @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Streams status request", parentTaskId, headers); } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java index e5a148c222174..ac10464fd030d 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java @@ -11,11 +11,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.injection.guice.Inject; @@ -28,9 +27,9 @@ * Transport action to retrieve the status of logs streams in a project / cluster. * Results are broken down by stream type. Currently only logs streams are implemented. */ -public class TransportStreamsStatusAction extends TransportMasterNodeReadAction { - - private final ProjectResolver projectResolver; +public class TransportStreamsStatusAction extends TransportLocalProjectMetadataAction< + StreamsStatusAction.Request, + StreamsStatusAction.Response> { @Inject public TransportStreamsStatusAction( @@ -42,33 +41,29 @@ public TransportStreamsStatusAction( ) { super( StreamsStatusAction.INSTANCE.name(), - transportService, - clusterService, - threadPool, actionFilters, - StreamsStatusAction.Request::new, - StreamsStatusAction.Response::new, - threadPool.executor(ThreadPool.Names.MANAGEMENT) + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver ); - this.projectResolver = projectResolver; } @Override - protected void masterOperation( + protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ProjectState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void localClusterStateOperation( Task task, StreamsStatusAction.Request request, - ClusterState state, + ProjectState state, ActionListener listener - ) throws Exception { - ProjectId projectId = projectResolver.getProjectId(); - StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + ) { + StreamsMetadata streamsState = state.metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); boolean logsEnabled = streamsState.isLogsEnabled(); StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled); listener.onResponse(response); } - - @Override - protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); - } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json index 784064249ecc7..677b88068f21b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -23,9 +23,9 @@ ] }, "params": { - "master_timeout": { + "timeout": { "type": "time", - "description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error." + "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." } } } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 848215573bc03..98bcdc2099381 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -299,6 +299,7 @@ static TransportVersion def(int id) { public static final TransportVersion NONE_CHUNKING_STRATEGY = def(9_097_0_00); public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00); public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_099_0_00); + /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 8e5462059792f..c79c3be156c24 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -1,4 +1,13 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test' +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + import org.elasticsearch.gradle.util.GradleUtils dependencies { @@ -21,6 +30,7 @@ dependencies { restTestConfig project(path: ':modules:data-streams', configuration: "basicRestSpecs") restTestConfig project(path: ':modules:ingest-common', configuration: "basicRestSpecs") restTestConfig project(path: ':modules:reindex', configuration: "basicRestSpecs") + restTestConfig project(path: ':modules:streams', configuration: "basicRestSpecs") } // let the yamlRestTests see the classpath of test From 8686648bd033b1171cae251e0115d24470661347 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 15:30:21 +0100 Subject: [PATCH 15/21] PR Fixes - Make enable / disable endpoint cancellable --- .../logs/LogsStreamsActivationToggleAction.java | 15 +++++++++------ .../logs/RestSetLogStreamsEnabledAction.java | 3 ++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java index 248e6bd7b90df..07eda61784c8f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/LogsStreamsActivationToggleAction.java @@ -9,15 +9,18 @@ package org.elasticsearch.rest.streams.logs; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; public class LogsStreamsActivationToggleAction { @@ -43,11 +46,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(enable); } - @Override - public ActionRequestValidationException validate() { - return null; - } - @Override public String toString() { return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}'; @@ -56,5 +54,10 @@ public String toString() { public boolean shouldEnable() { return enable; } + + @Override + public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers); + } } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java index 2de2a3a21b9f8..cdf429ccf02ef 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import java.util.List; @@ -48,7 +49,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli enabled ); - return restChannel -> client.execute( + return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( LogsStreamsActivationToggleAction.INSTANCE, activationRequest, new RestToXContentListener<>(restChannel) From 9991a0daa4a9895aa630636d766f6f4d91f0314e Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 16:06:28 +0100 Subject: [PATCH 16/21] PR Fixes - Switch timeout param name for status endpoint --- .../rest/streams/logs/RestStreamsStatusAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java index 65adec761689c..a9b3f5b1efe7f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java @@ -28,7 +28,7 @@ @ServerlessScope(Scope.PUBLIC) public class RestStreamsStatusAction extends BaseRestHandler { - public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_TIMEOUT_PARAM); + public static final Set SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM); @Override public String getName() { @@ -42,7 +42,7 @@ public List routes() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getAckTimeout(request)); + StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)); return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( StreamsStatusAction.INSTANCE, statusRequest, From faf7dabf1b86310f3e144c57d85de8ffcec05988 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 17 Jun 2025 16:23:58 +0100 Subject: [PATCH 17/21] PR Fixes - Switch timeout param name for status endpoint in spec --- .../src/main/resources/rest-api-spec/api/streams.status.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json index 677b88068f21b..722ddbee6675f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json @@ -23,7 +23,7 @@ ] }, "params": { - "timeout": { + "mater_timeout": { "type": "time", "description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error." } From 2204296e01314788de97d014cb53ed7598ffeba3 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 18 Jun 2025 09:31:54 +0100 Subject: [PATCH 18/21] PR Fixes - Enforce local only use for status action --- .../rest/streams/logs/StreamsStatusAction.java | 8 ++------ .../logs/TransportLogsStreamsToggleActivation.java | 4 ++-- ...utor.java => SequentialAckingBatchedTaskExecutor.java} | 3 ++- 3 files changed, 6 insertions(+), 9 deletions(-) rename server/src/main/java/org/elasticsearch/cluster/{SequentialTaskAckingTaskExecutor.java => SequentialAckingBatchedTaskExecutor.java} (88%) diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java index 76f87ad25b47b..95a1783ac0452 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java @@ -11,8 +11,8 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.local.LocalClusterStateRequest; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.CancellableTask; @@ -47,13 +47,9 @@ public Response(boolean logsEnabled) { logs_enabled = logsEnabled; } - public Response(StreamInput in) throws IOException { - logs_enabled = in.readBoolean(); - } - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(logs_enabled); + TransportAction.localOnly(); } @Override diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index c2a01ceeff948..700c434be0226 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -18,7 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.SequentialTaskAckingTaskExecutor; +import org.elasticsearch.cluster.SequentialAckingBatchedTaskExecutor; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ProjectId; @@ -64,7 +64,7 @@ public TransportLogsStreamsToggleActivation( this.taskQueue = clusterService.createTaskQueue( "streams-update-state-queue", Priority.NORMAL, - new SequentialTaskAckingTaskExecutor<>() + new SequentialAckingBatchedTaskExecutor<>() ); this.projectResolver = projectResolver; } diff --git a/server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java similarity index 88% rename from server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java rename to server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java index 7eee6d4c7a5bc..70f745c1b0c32 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java @@ -18,7 +18,8 @@ * * @param The type of the task that extends {@link AckedClusterStateUpdateTask}. */ -public class SequentialTaskAckingTaskExecutor extends SimpleBatchedAckListenerTaskExecutor { +public class SequentialAckingBatchedTaskExecutor extends SimpleBatchedAckListenerTaskExecutor< + Task> { @Override public Tuple executeTask(Task task, ClusterState clusterState) throws Exception { return Tuple.tuple(task.execute(clusterState), task); From 9d3d54716258cc175d2d1e0ddb3ff51761850dcf Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 18 Jun 2025 09:59:58 +0100 Subject: [PATCH 19/21] PR Fixes - Refactor StreamsMetadata into server --- .../org/elasticsearch/rest/streams/TestToggleIT.java | 1 + .../org/elasticsearch/rest/streams/StreamsPlugin.java | 10 ---------- .../logs/TransportLogsStreamsToggleActivation.java | 2 +- .../streams/logs/TransportStreamsStatusAction.java | 2 +- .../java/org/elasticsearch/cluster/ClusterModule.java | 5 +++++ .../cluster/metadata}/StreamsMetadata.java | 3 +-- 6 files changed, 9 insertions(+), 14 deletions(-) rename {modules/streams/src/main/java/org/elasticsearch/rest/streams => server/src/main/java/org/elasticsearch/cluster/metadata}/StreamsMetadata.java (97%) diff --git a/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java index c6f00c855b05e..be4a5a33cfa9f 100644 --- a/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java +++ b/modules/streams/src/internalClusterTest/java/org/elasticsearch/rest/streams/TestToggleIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction; import org.elasticsearch.test.ESIntegTestCase; diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java index acf807c42873f..4e3a531e20d56 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java @@ -9,10 +9,8 @@ package org.elasticsearch.rest.streams; -import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; @@ -68,12 +66,4 @@ public List getActions() { new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class) ); } - - @Override - public List getNamedWriteables() { - return List.of( - new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new), - new NamedWriteableRegistry.Entry(NamedDiff.class, StreamsMetadata.TYPE, StreamsMetadata::readDiffFrom) - ); - } } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java index 700c434be0226..95b71fb73b252 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java @@ -22,12 +22,12 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.injection.guice.Inject; -import org.elasticsearch.rest.streams.StreamsMetadata; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java index ac10464fd030d..e389dd8c402bd 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java +++ b/modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java @@ -15,10 +15,10 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.injection.guice.Inject; -import org.elasticsearch.rest.streams.StreamsMetadata; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 33a7139213f93..c4bc58b3d1831 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetadataMappingService; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.DelayedAllocationService; import org.elasticsearch.cluster.routing.ShardRouting; @@ -300,6 +301,10 @@ public static List getNamedWriteables() { // Health API entries.addAll(HealthNodeTaskExecutor.getNamedWriteables()); entries.addAll(HealthMetadataService.getNamedWriteables()); + + // Streams + registerProjectCustom(entries, StreamsMetadata.TYPE, StreamsMetadata::new, StreamsMetadata::readDiffFrom); + return entries; } diff --git a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java similarity index 97% rename from modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java rename to server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java index 529dd97673ed3..71dd9eeaff24f 100644 --- a/modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/StreamsMetadata.java @@ -7,13 +7,12 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.rest.streams; +package org.elasticsearch.cluster.metadata; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; From 9501023d7d509954c1dcf3b85083f418e95c25c7 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 18 Jun 2025 11:16:41 +0100 Subject: [PATCH 20/21] PR Fixes - Add streams module to multi project YAML test suite --- .../test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java index 1ec091c84553c..da0432a3e3c5b 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/src/yamlRestTest/java/org/elasticsearch/multiproject/test/CoreWithMultipleProjectsClientYamlTestSuiteIT.java @@ -37,6 +37,7 @@ public class CoreWithMultipleProjectsClientYamlTestSuiteIT extends MultipleProje .module("test-multi-project") .module("lang-mustache") .module("parent-join") + .module("streams") .setting("test.multi_project.enabled", "true") .setting("xpack.security.enabled", "true") .setting("xpack.watcher.enabled", "false") From 22db6f959e0dc00c83221c73e2cc10285b6a03ee Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 18 Jun 2025 15:02:42 +0100 Subject: [PATCH 21/21] PR Fixes - Add streams cluster module to multi project YAML test suite --- .../core-rest-tests-with-multiple-projects/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index c79c3be156c24..39115556d290f 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -22,6 +22,7 @@ dependencies { clusterModules project(':modules:data-streams') clusterModules project(':modules:lang-mustache') clusterModules project(':modules:parent-join') + clusterModules project(':modules:streams') clusterModules project(xpackModule('stack')) clusterModules project(xpackModule('ilm')) clusterModules project(xpackModule('mapper-constant-keyword'))