Skip to content

Commit 1ccf1c6

Browse files
authored
Streams - Log's Enable, Disable and Status endpoints (#129474)
* Enable And Disable Endpoint * Status Endpoint * Integration Tests * REST Spec * REST Spec tests * Some documentation * Update docs/changelog/129474.yaml * Fix failing security test * PR Fixes * PR Fixes - Add missing feature flag name to YAML spec * PR Fixes - Fix support for timeout and master_timeout parameters * PR Fixes - Make the REST handler validation happy with the new params * Delete docs/changelog/129474.yaml * PR Fixes - Switch to local metadata action type and improve request handling * PR Fixes - Make enable / disable endpoint cancellable * PR Fixes - Switch timeout param name for status endpoint * PR Fixes - Switch timeout param name for status endpoint in spec * PR Fixes - Enforce local only use for status action * PR Fixes - Refactor StreamsMetadata into server * PR Fixes - Add streams module to multi project YAML test suite * PR Fixes - Add streams cluster module to multi project YAML test suite
1 parent 6858c32 commit 1ccf1c6

File tree

22 files changed

+957
-0
lines changed

22 files changed

+957
-0
lines changed

modules/streams/build.gradle

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: 'elasticsearch.test-with-dependencies'
11+
apply plugin: 'elasticsearch.internal-cluster-test'
12+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
13+
apply plugin: 'elasticsearch.internal-java-rest-test'
14+
apply plugin: 'elasticsearch.yaml-rest-compat-test'
15+
16+
esplugin {
17+
description = 'The module adds support for the wired streams functionality including logs ingest'
18+
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
19+
}
20+
21+
restResources {
22+
restApi {
23+
include '_common', 'streams'
24+
}
25+
}
26+
27+
configurations {
28+
basicRestSpecs {
29+
attributes {
30+
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
31+
}
32+
}
33+
}
34+
35+
artifacts {
36+
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
37+
}
38+
39+
dependencies {
40+
testImplementation project(path: ':test:test-clusters')
41+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
13+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
16+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
17+
import org.elasticsearch.cluster.metadata.StreamsMetadata;
18+
import org.elasticsearch.plugins.Plugin;
19+
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
22+
import org.elasticsearch.test.transport.MockTransportService;
23+
24+
import java.io.IOException;
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.concurrent.ExecutionException;
28+
29+
import static org.hamcrest.Matchers.is;
30+
31+
public class TestToggleIT extends ESIntegTestCase {
32+
33+
@Override
34+
protected Collection<Class<? extends Plugin>> nodePlugins() {
35+
return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class);
36+
}
37+
38+
public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException {
39+
boolean[] testParams = new boolean[] { true, false, true };
40+
for (boolean enable : testParams) {
41+
doLogStreamToggleTest(enable);
42+
}
43+
}
44+
45+
private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException {
46+
LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request(
47+
TEST_REQUEST_TIMEOUT,
48+
TEST_REQUEST_TIMEOUT,
49+
enable
50+
);
51+
52+
AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get();
53+
ElasticsearchAssertions.assertAcked(acknowledgedResponse);
54+
55+
ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
56+
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get();
57+
ProjectMetadata projectMetadata = clusterStateResponse.getState().metadata().getProject(ProjectId.DEFAULT);
58+
59+
assertThat(projectMetadata.<StreamsMetadata>custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable));
60+
}
61+
62+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
module org.elasticsearch.rest.root {
11+
requires org.elasticsearch.server;
12+
requires org.elasticsearch.xcontent;
13+
requires org.apache.lucene.core;
14+
requires org.elasticsearch.base;
15+
requires org.apache.logging.log4j;
16+
17+
exports org.elasticsearch.rest.streams;
18+
exports org.elasticsearch.rest.streams.logs;
19+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
14+
import org.elasticsearch.cluster.node.DiscoveryNodes;
15+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
16+
import org.elasticsearch.common.settings.ClusterSettings;
17+
import org.elasticsearch.common.settings.IndexScopedSettings;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.settings.SettingsFilter;
20+
import org.elasticsearch.features.NodeFeature;
21+
import org.elasticsearch.plugins.ActionPlugin;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.rest.RestController;
24+
import org.elasticsearch.rest.RestHandler;
25+
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
26+
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
27+
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
28+
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
29+
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
30+
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;
31+
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.function.Predicate;
35+
import java.util.function.Supplier;
36+
37+
/**
38+
* This plugin provides the Streams feature which builds upon data streams to
39+
* provide the user with a more "batteries included" experience for ingesting large
40+
* streams of data, such as logs.
41+
*/
42+
public class StreamsPlugin extends Plugin implements ActionPlugin {
43+
44+
@Override
45+
public List<RestHandler> getRestHandlers(
46+
Settings settings,
47+
NamedWriteableRegistry namedWriteableRegistry,
48+
RestController restController,
49+
ClusterSettings clusterSettings,
50+
IndexScopedSettings indexScopedSettings,
51+
SettingsFilter settingsFilter,
52+
IndexNameExpressionResolver indexNameExpressionResolver,
53+
Supplier<DiscoveryNodes> nodesInCluster,
54+
Predicate<NodeFeature> clusterSupportsFeature
55+
) {
56+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
57+
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
58+
}
59+
return Collections.emptyList();
60+
}
61+
62+
@Override
63+
public List<ActionHandler> getActions() {
64+
return List.of(
65+
new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
66+
new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
67+
);
68+
}
69+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.tasks.CancellableTask;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskId;
21+
22+
import java.io.IOException;
23+
import java.util.Map;
24+
25+
public class LogsStreamsActivationToggleAction {
26+
27+
public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");
28+
29+
public static class Request extends AcknowledgedRequest<Request> {
30+
31+
private final boolean enable;
32+
33+
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
34+
super(masterNodeTimeout, ackTimeout);
35+
this.enable = enable;
36+
}
37+
38+
public Request(StreamInput in) throws IOException {
39+
super(in);
40+
this.enable = in.readBoolean();
41+
}
42+
43+
@Override
44+
public void writeTo(StreamOutput out) throws IOException {
45+
super.writeTo(out);
46+
out.writeBoolean(enable);
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
52+
}
53+
54+
public boolean shouldEnable() {
55+
return enable;
56+
}
57+
58+
@Override
59+
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
61+
}
62+
}
63+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestRequest;
15+
import org.elasticsearch.rest.RestUtils;
16+
import org.elasticsearch.rest.Scope;
17+
import org.elasticsearch.rest.ServerlessScope;
18+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
19+
import org.elasticsearch.rest.action.RestToXContentListener;
20+
21+
import java.util.List;
22+
import java.util.Set;
23+
24+
import static org.elasticsearch.rest.RestRequest.Method.POST;
25+
26+
@ServerlessScope(Scope.PUBLIC)
27+
public class RestSetLogStreamsEnabledAction extends BaseRestHandler {
28+
29+
public static final Set<String> SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM);
30+
31+
@Override
32+
public String getName() {
33+
return "streams_logs_set_enabled_action";
34+
}
35+
36+
@Override
37+
public List<Route> routes() {
38+
return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable"));
39+
}
40+
41+
@Override
42+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
43+
final boolean enabled = request.path().endsWith("_enable");
44+
assert enabled || request.path().endsWith("_disable");
45+
46+
LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request(
47+
RestUtils.getMasterNodeTimeout(request),
48+
RestUtils.getAckTimeout(request),
49+
enabled
50+
);
51+
52+
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
53+
LogsStreamsActivationToggleAction.INSTANCE,
54+
activationRequest,
55+
new RestToXContentListener<>(restChannel)
56+
);
57+
}
58+
59+
@Override
60+
public Set<String> supportedQueryParameters() {
61+
return SUPPORTED_PARAMS;
62+
}
63+
64+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestHandler;
15+
import org.elasticsearch.rest.RestRequest;
16+
import org.elasticsearch.rest.RestUtils;
17+
import org.elasticsearch.rest.Scope;
18+
import org.elasticsearch.rest.ServerlessScope;
19+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
20+
import org.elasticsearch.rest.action.RestToXContentListener;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
26+
import static org.elasticsearch.rest.RestRequest.Method.GET;
27+
28+
@ServerlessScope(Scope.PUBLIC)
29+
public class RestStreamsStatusAction extends BaseRestHandler {
30+
31+
public static final Set<String> SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM);
32+
33+
@Override
34+
public String getName() {
35+
return "streams_status_action";
36+
}
37+
38+
@Override
39+
public List<RestHandler.Route> routes() {
40+
return List.of(new RestHandler.Route(GET, "/_streams/status"));
41+
}
42+
43+
@Override
44+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
45+
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request));
46+
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
47+
StreamsStatusAction.INSTANCE,
48+
statusRequest,
49+
new RestToXContentListener<>(restChannel)
50+
);
51+
}
52+
53+
@Override
54+
public Set<String> supportedQueryParameters() {
55+
return SUPPORTED_PARAMS;
56+
}
57+
}

0 commit comments

Comments
 (0)