Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions modules/streams/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.test-with-dependencies'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'

esplugin {
description = 'The module adds support for the wired streams functionality including logs ingest'
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
}

restResources {
restApi {
include '_common', 'streams'
}
}

configurations {
basicRestSpecs {
attributes {
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
}
}
}

artifacts {
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
}

dependencies {
testImplementation project(path: ':test:test-clusters')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;

public class TestToggleIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class);
}

public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException {
boolean[] testParams = new boolean[] { true, false, true };
for (boolean enable : testParams) {
doLogStreamToggleTest(enable);
}
}

private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException {
LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
enable
);

AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get();
ElasticsearchAssertions.assertAcked(acknowledgedResponse);

ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get();

assertThat(clusterStateResponse.getState().metadata().<StreamsMetadata>custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable));
}

}
19 changes: 19 additions & 0 deletions modules/streams/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

module org.elasticsearch.rest.root {
requires org.elasticsearch.server;
requires org.elasticsearch.xcontent;
requires org.apache.lucene.core;
requires org.elasticsearch.base;
requires org.apache.logging.log4j;

exports org.elasticsearch.rest.streams;
exports org.elasticsearch.rest.streams.logs;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;

import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* This plugin provides the Streams feature which builds upon data streams to
* provide the user with a more "batteries included" experience for ingesting large
* streams of data, such as logs.
*/
public class StreamsPlugin extends Plugin implements ActionPlugin {

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
}
return Collections.emptyList();
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionHandler<>(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
new ActionHandler<>(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams.logs;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

public class LogsStreamsActivationToggleAction {

public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");

public static class Request extends AcknowledgedRequest<Request> {

private final boolean enable;

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
super(masterNodeTimeout, ackTimeout);
this.enable = enable;
}

public Request(StreamInput in) throws IOException {
super(in);
this.enable = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enable);
}

@Override
public String toString() {
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
}

public boolean shouldEnable() {
return enable;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams.logs;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestRequest.Method.POST;

@ServerlessScope(Scope.PUBLIC)
public class RestSetLogStreamsEnabledAction extends BaseRestHandler {

public static final Set<String> SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM);

@Override
public String getName() {
return "streams_logs_set_enabled_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final boolean enabled = request.path().endsWith("_enable");
assert enabled || request.path().endsWith("_disable");

LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request(
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request),
enabled
);

return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
LogsStreamsActivationToggleAction.INSTANCE,
activationRequest,
new RestToXContentListener<>(restChannel)
);
}

@Override
public Set<String> supportedQueryParameters() {
return SUPPORTED_PARAMS;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.streams.logs;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;

@ServerlessScope(Scope.PUBLIC)
public class RestStreamsStatusAction extends BaseRestHandler {

@Override
public String getName() {
return "streams_status_action";
}

@Override
public List<RestHandler.Route> routes() {
return List.of(new RestHandler.Route(GET, "/_streams/status"));
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request();
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
StreamsStatusAction.INSTANCE,
statusRequest,
new RestToXContentListener<>(restChannel)
);
}

}
Loading