Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
63f07aa
Enable And Disable Endpoint
lukewhiting May 21, 2025
2fa713f
Status Endpoint
lukewhiting Jun 10, 2025
5dc679c
Integration Tests
lukewhiting Jun 12, 2025
54d5730
REST Spec
lukewhiting Jun 12, 2025
7b7d52a
REST Spec tests
lukewhiting Jun 12, 2025
6ed2fdd
Some documentation
lukewhiting Jun 12, 2025
130f5b8
Update docs/changelog/129474.yaml
lukewhiting Jun 16, 2025
2b7ad57
Fix failing security test
lukewhiting Jun 16, 2025
e601c4d
PR Fixes
lukewhiting Jun 17, 2025
323caca
PR Fixes - Add missing feature flag name to YAML spec
lukewhiting Jun 17, 2025
c65a5fe
PR Fixes - Fix support for timeout and master_timeout parameters
lukewhiting Jun 17, 2025
8fa2ccd
PR Fixes - Make the REST handler validation happy with the new params
lukewhiting Jun 17, 2025
3f244d3
Delete docs/changelog/129474.yaml
lukewhiting Jun 17, 2025
6d1b654
PR Fixes - Switch to local metadata action type and improve request h…
lukewhiting Jun 17, 2025
88165ac
Merge remote-tracking branch 'origin/logs-stream-enable-endpoints' in…
lukewhiting Jun 17, 2025
8686648
PR Fixes - Make enable / disable endpoint cancellable
lukewhiting Jun 17, 2025
9991a0d
PR Fixes - Switch timeout param name for status endpoint
lukewhiting Jun 17, 2025
5a95155
Merge branch 'main' of github.com:elastic/elasticsearch into logs-str…
lukewhiting Jun 17, 2025
faf7dab
PR Fixes - Switch timeout param name for status endpoint in spec
lukewhiting Jun 17, 2025
2204296
PR Fixes - Enforce local only use for status action
lukewhiting Jun 18, 2025
9d3d547
PR Fixes - Refactor StreamsMetadata into server
lukewhiting Jun 18, 2025
9501023
PR Fixes - Add streams module to multi project YAML test suite
lukewhiting Jun 18, 2025
6815393
Merge branch 'main' of github.com:elastic/elasticsearch into logs-str…
lukewhiting Jun 18, 2025
22db6f9
PR Fixes - Add streams cluster module to multi project YAML test suite
lukewhiting Jun 18, 2025
1e48edc
Merge branch 'main' of github.com:elastic/elasticsearch into logs-str…
lukewhiting Jun 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129474
summary: "Streams - Log's Enable, Disable and Status endpoints"
area: Data streams
type: feature
issues: []
33 changes: 33 additions & 0 deletions modules/streams/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apply plugin: 'elasticsearch.test-with-dependencies'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'

esplugin {
description = 'The module adds support for the wired streams functionality including logs ingest'
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
}

restResources {
restApi {
// TODO: Limit this down to just required API's for faster build. See https://github.com/elastic/elasticsearch/blob/fb3149cc664eb7061d55741a87e7e8cf29db4989/TESTING.asciidoc#L443
include '*'
}
}

configurations {
basicRestSpecs {
attributes {
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
}
}
}

artifacts {
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
}

dependencies {
testImplementation project(path: ':test:test-clusters')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;

public class TestToggleIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class);
}

public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException {
boolean[] testParams = new boolean[] { true, false, true };
for (boolean enable : testParams) {
doLogStreamToggleTest(enable);
}
}

private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException {
LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
enable
);

AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get();
ElasticsearchAssertions.assertAcked(acknowledgedResponse);

ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get();
ProjectMetadata projectMetadata = clusterStateResponse.getState().metadata().getProject(ProjectId.DEFAULT);

assertThat(projectMetadata.<StreamsMetadata>custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable));
}

}
19 changes: 19 additions & 0 deletions modules/streams/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

module org.elasticsearch.rest.root {
requires org.elasticsearch.server;
requires org.elasticsearch.xcontent;
requires org.apache.lucene.core;
requires org.elasticsearch.base;
requires org.apache.logging.log4j;

exports org.elasticsearch.rest.streams;
exports org.elasticsearch.rest.streams.logs;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;

/**
* Metadata for the Streams feature, which allows enabling or disabling logs for data streams.
* This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state.
*/
public class StreamsMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom> implements Metadata.ProjectCustom {

public static final String TYPE = "streams";
public static final StreamsMetadata EMPTY = new StreamsMetadata(false);

public boolean logsEnabled;

public StreamsMetadata(StreamInput in) throws IOException {
logsEnabled = in.readBoolean();
}

public StreamsMetadata(boolean logsEnabled) {
this.logsEnabled = logsEnabled;
}

public boolean isLogsEnabled() {
return logsEnabled;
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.ALL_CONTEXTS;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.STREAMS_LOGS_SUPPORT;
}

public static NamedDiff<Metadata.ProjectCustom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(logsEnabled);
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled)));
}

@Override
public boolean equals(Object o) {
if ((o instanceof StreamsMetadata that)) {
return logsEnabled == that.logsEnabled;
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hashCode(logsEnabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams;

import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;

import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* This plugin provides the Streams feature which builds upon data streams to
* provide the user with a more "batteries included" experience for ingesting large
* streams of data, such as logs.
*/
public class StreamsPlugin extends Plugin implements ActionPlugin {

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
}
return Collections.emptyList();
}

@Override
public List<ActionHandler> getActions() {
return List.of(
new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
);
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, StreamsMetadata.TYPE, StreamsMetadata::readDiffFrom)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams.logs;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;

public class LogsStreamsActivationToggleAction {

public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");

public static class Request extends AcknowledgedRequest<Request> {

private final boolean enable;

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
super(masterNodeTimeout, ackTimeout);
this.enable = enable;
}

public Request(StreamInput in) throws IOException {
super(in);
this.enable = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enable);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public String toString() {
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
}

public boolean shouldEnable() {
return enable;
}
}
}
Loading
Loading