Skip to content

Commit 7ea565a

Browse files
authored
[Backport] Streams - Log's Enable, Disable and Status endpoints (elastic#129474) (elastic#129838)
* Streams - Log's Enable, Disable and Status endpoints (elastic#129474) * Enable And Disable Endpoint * Status Endpoint * Integration Tests * REST Spec * REST Spec tests * Some documentation * Update docs/changelog/129474.yaml * Fix failing security test * PR Fixes * PR Fixes - Add missing feature flag name to YAML spec * PR Fixes - Fix support for timeout and master_timeout parameters * PR Fixes - Make the REST handler validation happy with the new params * Delete docs/changelog/129474.yaml * PR Fixes - Switch to local metadata action type and improve request handling * PR Fixes - Make enable / disable endpoint cancellable * PR Fixes - Switch timeout param name for status endpoint * PR Fixes - Switch timeout param name for status endpoint in spec * PR Fixes - Enforce local only use for status action * PR Fixes - Refactor StreamsMetadata into server * PR Fixes - Add streams module to multi project YAML test suite * PR Fixes - Add streams cluster module to multi project YAML test suite * Null out reader in status transport action super constructor * Added comment about minimum version support
1 parent 48197b7 commit 7ea565a

File tree

20 files changed

+926
-0
lines changed

20 files changed

+926
-0
lines changed

modules/streams/build.gradle

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: 'elasticsearch.test-with-dependencies'
11+
apply plugin: 'elasticsearch.internal-cluster-test'
12+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
13+
apply plugin: 'elasticsearch.internal-java-rest-test'
14+
apply plugin: 'elasticsearch.yaml-rest-compat-test'
15+
16+
esplugin {
17+
description = 'The module adds support for the wired streams functionality including logs ingest'
18+
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
19+
}
20+
21+
restResources {
22+
restApi {
23+
include '_common', 'streams'
24+
}
25+
}
26+
27+
configurations {
28+
basicRestSpecs {
29+
attributes {
30+
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
31+
}
32+
}
33+
}
34+
35+
artifacts {
36+
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
37+
}
38+
39+
dependencies {
40+
testImplementation project(path: ':test:test-clusters')
41+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
13+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.cluster.metadata.StreamsMetadata;
16+
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
18+
import org.elasticsearch.test.ESIntegTestCase;
19+
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
20+
import org.elasticsearch.test.transport.MockTransportService;
21+
22+
import java.io.IOException;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.concurrent.ExecutionException;
26+
27+
import static org.hamcrest.Matchers.is;
28+
29+
public class TestToggleIT extends ESIntegTestCase {
30+
31+
@Override
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
return List.of(StreamsPlugin.class, MockTransportService.TestPlugin.class);
34+
}
35+
36+
public void testLogStreamToggle() throws IOException, ExecutionException, InterruptedException {
37+
boolean[] testParams = new boolean[] { true, false, true };
38+
for (boolean enable : testParams) {
39+
doLogStreamToggleTest(enable);
40+
}
41+
}
42+
43+
private void doLogStreamToggleTest(boolean enable) throws IOException, ExecutionException, InterruptedException {
44+
LogsStreamsActivationToggleAction.Request request = new LogsStreamsActivationToggleAction.Request(
45+
TEST_REQUEST_TIMEOUT,
46+
TEST_REQUEST_TIMEOUT,
47+
enable
48+
);
49+
50+
AcknowledgedResponse acknowledgedResponse = client().execute(LogsStreamsActivationToggleAction.INSTANCE, request).get();
51+
ElasticsearchAssertions.assertAcked(acknowledgedResponse);
52+
53+
ClusterStateRequest state = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
54+
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(state).get();
55+
56+
assertThat(clusterStateResponse.getState().metadata().<StreamsMetadata>custom(StreamsMetadata.TYPE).isLogsEnabled(), is(enable));
57+
}
58+
59+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
module org.elasticsearch.rest.root {
11+
requires org.elasticsearch.server;
12+
requires org.elasticsearch.xcontent;
13+
requires org.apache.lucene.core;
14+
requires org.elasticsearch.base;
15+
requires org.apache.logging.log4j;
16+
17+
exports org.elasticsearch.rest.streams;
18+
exports org.elasticsearch.rest.streams.logs;
19+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.action.ActionRequest;
13+
import org.elasticsearch.action.ActionResponse;
14+
import org.elasticsearch.cluster.metadata.DataStream;
15+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
16+
import org.elasticsearch.cluster.node.DiscoveryNodes;
17+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
18+
import org.elasticsearch.common.settings.ClusterSettings;
19+
import org.elasticsearch.common.settings.IndexScopedSettings;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.settings.SettingsFilter;
22+
import org.elasticsearch.features.NodeFeature;
23+
import org.elasticsearch.plugins.ActionPlugin;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.rest.RestController;
26+
import org.elasticsearch.rest.RestHandler;
27+
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
28+
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
29+
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
30+
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
31+
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
32+
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;
33+
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.function.Predicate;
37+
import java.util.function.Supplier;
38+
39+
/**
40+
* This plugin provides the Streams feature which builds upon data streams to
41+
* provide the user with a more "batteries included" experience for ingesting large
42+
* streams of data, such as logs.
43+
*/
44+
public class StreamsPlugin extends Plugin implements ActionPlugin {
45+
46+
@Override
47+
public List<RestHandler> getRestHandlers(
48+
Settings settings,
49+
NamedWriteableRegistry namedWriteableRegistry,
50+
RestController restController,
51+
ClusterSettings clusterSettings,
52+
IndexScopedSettings indexScopedSettings,
53+
SettingsFilter settingsFilter,
54+
IndexNameExpressionResolver indexNameExpressionResolver,
55+
Supplier<DiscoveryNodes> nodesInCluster,
56+
Predicate<NodeFeature> clusterSupportsFeature
57+
) {
58+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
59+
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
60+
}
61+
return Collections.emptyList();
62+
}
63+
64+
@Override
65+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
66+
return List.of(
67+
new ActionHandler<>(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
68+
new ActionHandler<>(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
69+
);
70+
}
71+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.tasks.CancellableTask;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskId;
21+
22+
import java.io.IOException;
23+
import java.util.Map;
24+
25+
public class LogsStreamsActivationToggleAction {
26+
27+
public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");
28+
29+
public static class Request extends AcknowledgedRequest<Request> {
30+
31+
private final boolean enable;
32+
33+
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
34+
super(masterNodeTimeout, ackTimeout);
35+
this.enable = enable;
36+
}
37+
38+
public Request(StreamInput in) throws IOException {
39+
super(in);
40+
this.enable = in.readBoolean();
41+
}
42+
43+
@Override
44+
public void writeTo(StreamOutput out) throws IOException {
45+
super.writeTo(out);
46+
out.writeBoolean(enable);
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
52+
}
53+
54+
public boolean shouldEnable() {
55+
return enable;
56+
}
57+
58+
@Override
59+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new CancellableTask(id, type, action, "Logs streams activation toggle request", parentTaskId, headers);
61+
}
62+
}
63+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestRequest;
15+
import org.elasticsearch.rest.RestUtils;
16+
import org.elasticsearch.rest.Scope;
17+
import org.elasticsearch.rest.ServerlessScope;
18+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
19+
import org.elasticsearch.rest.action.RestToXContentListener;
20+
21+
import java.util.List;
22+
import java.util.Set;
23+
24+
import static org.elasticsearch.rest.RestRequest.Method.POST;
25+
26+
@ServerlessScope(Scope.PUBLIC)
27+
public class RestSetLogStreamsEnabledAction extends BaseRestHandler {
28+
29+
public static final Set<String> SUPPORTED_PARAMS = Set.of(RestUtils.REST_MASTER_TIMEOUT_PARAM, RestUtils.REST_TIMEOUT_PARAM);
30+
31+
@Override
32+
public String getName() {
33+
return "streams_logs_set_enabled_action";
34+
}
35+
36+
@Override
37+
public List<Route> routes() {
38+
return List.of(new Route(POST, "/_streams/logs/_enable"), new Route(POST, "/_streams/logs/_disable"));
39+
}
40+
41+
@Override
42+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
43+
final boolean enabled = request.path().endsWith("_enable");
44+
assert enabled || request.path().endsWith("_disable");
45+
46+
LogsStreamsActivationToggleAction.Request activationRequest = new LogsStreamsActivationToggleAction.Request(
47+
RestUtils.getMasterNodeTimeout(request),
48+
RestUtils.getAckTimeout(request),
49+
enabled
50+
);
51+
52+
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
53+
LogsStreamsActivationToggleAction.INSTANCE,
54+
activationRequest,
55+
new RestToXContentListener<>(restChannel)
56+
);
57+
}
58+
59+
@Override
60+
public Set<String> supportedQueryParameters() {
61+
return SUPPORTED_PARAMS;
62+
}
63+
64+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestHandler;
15+
import org.elasticsearch.rest.RestRequest;
16+
import org.elasticsearch.rest.Scope;
17+
import org.elasticsearch.rest.ServerlessScope;
18+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
19+
import org.elasticsearch.rest.action.RestToXContentListener;
20+
21+
import java.util.List;
22+
23+
import static org.elasticsearch.rest.RestRequest.Method.GET;
24+
25+
@ServerlessScope(Scope.PUBLIC)
26+
public class RestStreamsStatusAction extends BaseRestHandler {
27+
28+
@Override
29+
public String getName() {
30+
return "streams_status_action";
31+
}
32+
33+
@Override
34+
public List<RestHandler.Route> routes() {
35+
return List.of(new RestHandler.Route(GET, "/_streams/status"));
36+
}
37+
38+
@Override
39+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
40+
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request();
41+
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
42+
StreamsStatusAction.INSTANCE,
43+
statusRequest,
44+
new RestToXContentListener<>(restChannel)
45+
);
46+
}
47+
48+
}

0 commit comments

Comments
 (0)