Skip to content

Commit 63f07aa

Browse files
committed
Enable And Disable Endpoint
1 parent b517bc7 commit 63f07aa

File tree

9 files changed

+444
-1
lines changed

9 files changed

+444
-1
lines changed

modules/streams/build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apply plugin: 'elasticsearch.test-with-dependencies'
2+
apply plugin: 'elasticsearch.internal-cluster-test'
3+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
4+
apply plugin: 'elasticsearch.internal-java-rest-test'
5+
apply plugin: 'elasticsearch.yaml-rest-compat-test'
6+
7+
esplugin {
8+
description = 'The module adds support for the wired streams functionality including logs ingest'
9+
classname = 'org.elasticsearch.rest.streams.StreamsPlugin'
10+
}
11+
12+
restResources {
13+
restApi {
14+
// TODO: Limit this down to just required API's for faster build. See https://github.com/elastic/elasticsearch/blob/fb3149cc664eb7061d55741a87e7e8cf29db4989/TESTING.asciidoc#L443
15+
include '*'
16+
}
17+
}
18+
19+
dependencies {
20+
testImplementation project(path: ':test:test-clusters')
21+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
module org.elasticsearch.rest.root {
11+
requires org.elasticsearch.server;
12+
requires org.elasticsearch.xcontent;
13+
requires org.apache.lucene.core;
14+
requires org.elasticsearch.base;
15+
requires org.apache.logging.log4j;
16+
17+
exports org.elasticsearch.rest.streams;
18+
exports org.elasticsearch.rest.streams.logs;
19+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
14+
import org.elasticsearch.cluster.AbstractNamedDiffable;
15+
import org.elasticsearch.cluster.metadata.Metadata;
16+
import org.elasticsearch.common.collect.Iterators;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
20+
import org.elasticsearch.xcontent.ToXContent;
21+
22+
import java.io.IOException;
23+
import java.util.EnumSet;
24+
import java.util.Iterator;
25+
26+
/**
27+
* Metadata for the Streams feature, which allows enabling or disabling logs for data streams.
28+
* This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state.
29+
*/
30+
public class StreamsMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom> implements Metadata.ProjectCustom {
31+
32+
public static final String TYPE = "streams";
33+
public static final StreamsMetadata EMPTY = new StreamsMetadata(false);
34+
35+
public boolean logsEnabled;
36+
37+
public StreamsMetadata(StreamInput in) throws IOException {
38+
logsEnabled = in.readBoolean();
39+
}
40+
41+
public StreamsMetadata(boolean logsEnabled) {
42+
this.logsEnabled = logsEnabled;
43+
}
44+
45+
public boolean isLogsEnabled() {
46+
return logsEnabled;
47+
}
48+
49+
public void setLogsEnabled(boolean logsEnabled) {
50+
this.logsEnabled = logsEnabled;
51+
}
52+
53+
@Override
54+
public EnumSet<Metadata.XContentContext> context() {
55+
return Metadata.ALL_CONTEXTS;
56+
}
57+
58+
@Override
59+
public String getWriteableName() {
60+
return TYPE;
61+
}
62+
63+
@Override
64+
public TransportVersion getMinimalSupportedVersion() {
65+
return TransportVersions.STREAMS_LOGS_SUPPORT;
66+
}
67+
68+
@Override
69+
public void writeTo(StreamOutput out) throws IOException {
70+
out.writeBoolean(logsEnabled);
71+
}
72+
73+
@Override
74+
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
75+
return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled)));
76+
}
77+
78+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams;
11+
12+
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
14+
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.cluster.node.DiscoveryNodes;
16+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
17+
import org.elasticsearch.common.settings.ClusterSettings;
18+
import org.elasticsearch.common.settings.IndexScopedSettings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.settings.SettingsFilter;
21+
import org.elasticsearch.features.NodeFeature;
22+
import org.elasticsearch.plugins.ActionPlugin;
23+
import org.elasticsearch.plugins.Plugin;
24+
import org.elasticsearch.rest.RestController;
25+
import org.elasticsearch.rest.RestHandler;
26+
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
27+
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
28+
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
29+
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.function.Predicate;
33+
import java.util.function.Supplier;
34+
35+
/**
36+
* Document here the Streams plugin.
37+
*/
38+
public class StreamsPlugin extends Plugin implements ActionPlugin {
39+
40+
@Override
41+
public List<RestHandler> getRestHandlers(
42+
Settings settings,
43+
NamedWriteableRegistry namedWriteableRegistry,
44+
RestController restController,
45+
ClusterSettings clusterSettings,
46+
IndexScopedSettings indexScopedSettings,
47+
SettingsFilter settingsFilter,
48+
IndexNameExpressionResolver indexNameExpressionResolver,
49+
Supplier<DiscoveryNodes> nodesInCluster,
50+
Predicate<NodeFeature> clusterSupportsFeature
51+
) {
52+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
53+
return List.of(new RestSetLogStreamsEnabledAction());
54+
}
55+
return Collections.emptyList();
56+
}
57+
58+
@Override
59+
public List<ActionHandler> getActions() {
60+
return List.of(new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class));
61+
}
62+
63+
@Override
64+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
65+
return List.of(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new));
66+
}
67+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.core.TimeValue;
19+
20+
import java.io.IOException;
21+
22+
public class LogsStreamsActivationToggleAction {
23+
24+
public static ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>("cluster:admin/streams/logs/toggle");
25+
26+
public static class Request extends AcknowledgedRequest<Request> {
27+
28+
private final boolean enable;
29+
30+
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, boolean enable) {
31+
super(masterNodeTimeout, ackTimeout);
32+
this.enable = enable;
33+
}
34+
35+
public Request(StreamInput in) throws IOException {
36+
super(in);
37+
this.enable = in.readBoolean();
38+
}
39+
40+
@Override
41+
public void writeTo(StreamOutput out) throws IOException {
42+
super.writeTo(out);
43+
out.writeBoolean(enable);
44+
}
45+
46+
@Override
47+
public ActionRequestValidationException validate() {
48+
return null;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "LogsStreamsActivationToggleAction.Request{" + "enable=" + enable + '}';
54+
}
55+
56+
public boolean shouldEnable() {
57+
return enable;
58+
}
59+
}
60+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestRequest;
15+
import org.elasticsearch.rest.RestUtils;
16+
import org.elasticsearch.rest.Scope;
17+
import org.elasticsearch.rest.ServerlessScope;
18+
import org.elasticsearch.rest.action.RestToXContentListener;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
23+
import static org.elasticsearch.rest.RestRequest.Method.GET;
24+
25+
@ServerlessScope(Scope.PUBLIC)
26+
public class RestSetLogStreamsEnabledAction extends BaseRestHandler {
27+
@Override
28+
public String getName() {
29+
return "streams_logs_set_enabled_action";
30+
}
31+
32+
// TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable
33+
@Override
34+
public List<Route> routes() {
35+
return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable"));
36+
}
37+
38+
@Override
39+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
40+
final boolean enabled = request.path().endsWith("_enable");
41+
assert enabled || request.path().endsWith("_disable");
42+
return restChannel -> client.execute(
43+
LogsStreamsActivationToggleAction.INSTANCE,
44+
new LogsStreamsActivationToggleAction.Request(
45+
RestUtils.getMasterNodeTimeout(request),
46+
RestUtils.getAckTimeout(request),
47+
enabled
48+
),
49+
new RestToXContentListener<>(restChannel)
50+
);
51+
}
52+
53+
}

0 commit comments

Comments
 (0)