Skip to content

Conversation

@lukewhiting
Copy link
Contributor

This pull request introduces a new module to Elasticsearch called streams.

This feature will provide "batteries included" ingest of bulk data with minimal user setup.

This initial PR introduces the new new ES module to support the function, including:

  • Build config
  • REST endpoints to enable/disable the functionality
  • REST endpoint to check if function is enabled
  • Appropriate actions to power those REST endpoints
  • A new cluster state object for storing feature state
  • REST and Integration tests.

Impliments ES-11330

@elasticsearchmachine
Copy link
Collaborator

Hi @lukewhiting, I've created a changelog YAML for you.

This comment was marked as outdated.

@lukewhiting lukewhiting marked this pull request as ready for review June 16, 2025 12:46
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Jun 16, 2025
@masseyke masseyke self-requested a review June 16, 2025 20:49
Copy link
Contributor

@nielsbauman nielsbauman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look much at the logic or tests, I just left some comments on the general wiring of the new APIs.

Copy link
Contributor

@nielsbauman nielsbauman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the timeout comment, the API wiring stuff LGTM 👍


@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getAckTimeout(request));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the masterNodeTimeout (same goes for the supported params above). Even though the request runs on the local node, we decided to keep the query param name the same (master_timeout) for BwC reasons, so I think it makes sense to use that name here too for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched both to master node timeout 👍🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to also update the spec :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many moving parts 😅 Thanks for the reminder

@lukewhiting lukewhiting requested a review from Copilot June 17, 2025 15:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new streams module that provides REST endpoints to enable, disable, and report the status of the logs streams feature, backed by a cluster-state metadata custom object and tested via both YAML and Java integration tests.

  • Add REST API specs, handlers, and transport actions for /_streams/logs/_enable, /_streams/logs/_disable, and /_streams/status.
  • Persist feature state in a new StreamsMetadata cluster-state custom object using a sequential, acking task executor.
  • Register the module in the build and multi-project test configurations, and add both YAML and internal-cluster tests.

Reviewed Changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated no comments.

Show a summary per file
File Description
x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle Include :modules:streams in YAML REST multi-project tests
x-pack/plugin/security/qa/operator-privileges-tests/.../Constants.java Grant operator privileges for streams toggle and status
server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java New executor for sequential acked cluster-state update tasks
server/src/main/java/org/elasticsearch/TransportVersions.java Add STREAMS_LOGS_SUPPORT transport version constant
rest-api-spec/src/main/resources/rest-api-spec/api/streams.*.json Define API specs for streams enable, disable, and status
modules/streams/src/yamlRestTest/.../streams/logs/10_basic.yml Add YAML tests for toggling and status checks
modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/Transport*Action.java Implement transport actions and REST handlers
modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java Register REST handlers, actions, and named writeables
modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java Define custom cluster-state metadata for streams feature
modules/streams/src/main/java/module-info.java Module descriptor for the streams plugin
modules/streams/src/internalClusterTest/.../TestToggleIT.java Add Java integration test for toggle behavior
Comments suppressed due to low confidence (3)

modules/streams/src/main/java/module-info.java:10

  • The module name org.elasticsearch.rest.root is misleading; consider renaming it to org.elasticsearch.rest.streams to match the plugin package and functionality.
module org.elasticsearch.rest.root {

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java:85

  • [nitpick] The task name prefix enable-streams-logs can be confusing when disabling; consider using a neutral prefix like streams-logs-toggle-[%s] so both enable and disable modes read clearly.
            String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");

modules/streams/src/main/java/module-info.java:12

  • The @Inject annotation from org.elasticsearch.injection.guice is used throughout the code but the module descriptor does not require the injection module; add requires org.elasticsearch.injection.guice (or the correct module) to avoid compilation errors.
    requires org.elasticsearch.xcontent;


@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(logs_enabled);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ought to never be called, right? You could just add an assert false or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! You can use TransportAction.localOnly() for that, if you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced this with a call to TransportAction.localOnly() and removed the redundant constructor.

logs_enabled = logsEnabled;
}

public Response(StreamInput in) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never called, right? And this is always local? So I think you could just delete it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed :-)

*/
public class SequentialTaskAckingTaskExecutor<Task extends AckedClusterStateUpdateTask> extends SimpleBatchedAckListenerTaskExecutor<Task> {
@Override
public Tuple<ClusterState, ClusterStateAckListener> executeTask(Task task, ClusterState clusterState) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talking myself through what you've probably already considered -- there's no need to use a AckedBatchedClusterStateUpdateTask and to batch (deduplicate) these, b/c this is just not an action that is commonly going to ever be called more than once in the lifetime of a cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@masseyke This executor class extends SimpleBatchedAckListenerTaskExecutor, so these cluster state updates will be batched. Maybe the name (SequentialTaskAckingTaskExecutor) should include the word "batch" somewhere to indicate that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think I'm just confused between what AckedBatchedClusterStateUpdateTask and AckedClusterStateUpdateTask are for. But regardless, it looks fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this class to SequentialAckingBatchedTaskExecutor to avoid future confusion

ActionListener<AcknowledgedResponse> listener
) throws Exception {
ProjectId projectId = projectResolver.getProjectId();
StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since StreamsMetadata is in a module, and we need to be able to check this from server (within the bulk transport action), do we need to move StreamsMetadata to server? Or have some way to expose it in there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's probably a good idea. Have moved the metadata object into server.

restTestConfig project(path: ':modules:data-streams', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:ingest-common', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:reindex', configuration: "basicRestSpecs")
restTestConfig project(path: ':modules:streams', configuration: "basicRestSpecs")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've also got to have

clusterModules project(':modules:streams')

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once the build works.

@lukewhiting lukewhiting merged commit 1ccf1c6 into elastic:main Jun 19, 2025
27 checks passed
masseyke pushed a commit to masseyke/elasticsearch that referenced this pull request Jun 20, 2025
* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite
lukewhiting added a commit to lukewhiting/elasticsearch that referenced this pull request Jun 23, 2025
* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite
lukewhiting added a commit that referenced this pull request Jun 23, 2025
) (#129838)

* Streams - Log's Enable, Disable and Status endpoints (#129474)

* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite

* Null out reader in status transport action super constructor

* Added comment about minimum version support
kderusso pushed a commit to kderusso/elasticsearch that referenced this pull request Jun 23, 2025
* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite
@lukewhiting lukewhiting deleted the logs-stream-enable-endpoints branch June 24, 2025 09:26
mridula-s109 pushed a commit to mridula-s109/elasticsearch that referenced this pull request Jun 25, 2025
* Enable And Disable Endpoint

* Status Endpoint

* Integration Tests

* REST Spec

* REST Spec tests

* Some documentation

* Update docs/changelog/129474.yaml

* Fix failing security test

* PR Fixes

* PR Fixes - Add missing feature flag name to YAML spec

* PR Fixes - Fix support for timeout and master_timeout parameters

* PR Fixes - Make the REST handler validation happy with the new params

* Delete docs/changelog/129474.yaml

* PR Fixes - Switch to local metadata action type and improve request handling

* PR Fixes - Make enable / disable endpoint cancellable

* PR Fixes - Switch timeout param name for status endpoint

* PR Fixes - Switch timeout param name for status endpoint in spec

* PR Fixes - Enforce local only use for status action

* PR Fixes - Refactor StreamsMetadata into server

* PR Fixes - Add streams module to multi project YAML test suite

* PR Fixes - Add streams cluster module to multi project YAML test suite
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Data streams Data streams and their lifecycles >non-issue Team:Data Management Meta label for data/management team v8.19.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants