Skip to content

Commit 2fa713f

Browse files
committed
Status Endpoint
1 parent 63f07aa commit 2fa713f

File tree

6 files changed

+205
-5
lines changed

6 files changed

+205
-5
lines changed

modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import org.elasticsearch.rest.RestHandler;
2626
import org.elasticsearch.rest.streams.logs.LogsStreamsActivationToggleAction;
2727
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
28+
import org.elasticsearch.rest.streams.logs.RestStreamsStatusAction;
29+
import org.elasticsearch.rest.streams.logs.StreamsStatusAction;
2830
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
31+
import org.elasticsearch.rest.streams.logs.TransportStreamsStatusAction;
2932

3033
import java.util.Collections;
3134
import java.util.List;
@@ -50,14 +53,17 @@ public List<RestHandler> getRestHandlers(
5053
Predicate<NodeFeature> clusterSupportsFeature
5154
) {
5255
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
53-
return List.of(new RestSetLogStreamsEnabledAction());
56+
return List.of(new RestSetLogStreamsEnabledAction(), new RestStreamsStatusAction());
5457
}
5558
return Collections.emptyList();
5659
}
5760

5861
@Override
5962
public List<ActionHandler> getActions() {
60-
return List.of(new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class));
63+
return List.of(
64+
new ActionHandler(LogsStreamsActivationToggleAction.INSTANCE, TransportLogsStreamsToggleActivation.class),
65+
new ActionHandler(StreamsStatusAction.INSTANCE, TransportStreamsStatusAction.class)
66+
);
6167
}
6268

6369
@Override

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public String getName() {
2929
return "streams_logs_set_enabled_action";
3030
}
3131

32-
// TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable
3332
@Override
3433
public List<Route> routes() {
3534
return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable"));
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestHandler;
15+
import org.elasticsearch.rest.RestRequest;
16+
import org.elasticsearch.rest.RestUtils;
17+
import org.elasticsearch.rest.Scope;
18+
import org.elasticsearch.rest.ServerlessScope;
19+
import org.elasticsearch.rest.action.RestToXContentListener;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static org.elasticsearch.rest.RestRequest.Method.GET;
25+
26+
@ServerlessScope(Scope.PUBLIC)
27+
public class RestStreamsStatusAction extends BaseRestHandler {
28+
29+
@Override
30+
public String getName() {
31+
return "streams_status_action";
32+
}
33+
34+
@Override
35+
public List<RestHandler.Route> routes() {
36+
return List.of(new RestHandler.Route(GET, "/_streams/_status"));
37+
}
38+
39+
@Override
40+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
41+
return restChannel -> client.execute(
42+
StreamsStatusAction.INSTANCE,
43+
new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request)),
44+
new RestToXContentListener<>(restChannel)
45+
);
46+
}
47+
48+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.action.ActionResponse;
14+
import org.elasticsearch.action.ActionType;
15+
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.xcontent.ToXContentObject;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
22+
import java.io.IOException;
23+
24+
public class StreamsStatusAction {
25+
26+
public static ActionType<Response> INSTANCE = new ActionType<>("cluster:admin/streams/status");
27+
28+
public static class Request extends MasterNodeReadRequest<Request> {
29+
30+
public Request(TimeValue masterNodeTimeout) {
31+
super(masterNodeTimeout);
32+
}
33+
34+
public Request(StreamInput in) throws IOException {
35+
super(in);
36+
}
37+
38+
@Override
39+
public ActionRequestValidationException validate() {
40+
return null;
41+
}
42+
43+
@Override
44+
public void writeTo(StreamOutput out) throws IOException {
45+
super.writeTo(out);
46+
}
47+
}
48+
49+
public static class Response extends ActionResponse implements ToXContentObject {
50+
51+
private final boolean logs_enabled;
52+
53+
public Response(boolean logsEnabled) {
54+
logs_enabled = logsEnabled;
55+
}
56+
57+
public Response(StreamInput in) throws IOException {
58+
logs_enabled = in.readBoolean();
59+
}
60+
61+
@Override
62+
public void writeTo(StreamOutput out) throws IOException {
63+
out.writeBoolean(logs_enabled);
64+
}
65+
66+
@Override
67+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
68+
builder.startObject();
69+
70+
builder.startObject("logs");
71+
builder.field("enabled", logs_enabled);
72+
builder.endObject();
73+
74+
builder.endObject();
75+
return builder;
76+
}
77+
}
78+
}

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,14 @@ public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportM
4343

4444
@Inject
4545
public TransportLogsStreamsToggleActivation(
46-
String actionName,
4746
TransportService transportService,
4847
ClusterService clusterService,
4948
ThreadPool threadPool,
5049
ActionFilters actionFilters,
5150
ProjectResolver projectResolver
5251
) {
5352
super(
54-
actionName,
53+
LogsStreamsActivationToggleAction.INSTANCE.name(),
5554
transportService,
5655
clusterService,
5756
threadPool,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.streams.logs;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.block.ClusterBlockException;
17+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
18+
import org.elasticsearch.cluster.metadata.ProjectId;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.injection.guice.Inject;
22+
import org.elasticsearch.rest.streams.StreamsMetadata;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.transport.TransportService;
26+
27+
public class TransportStreamsStatusAction extends TransportMasterNodeReadAction<StreamsStatusAction.Request, StreamsStatusAction.Response> {
28+
29+
private final ProjectResolver projectResolver;
30+
31+
@Inject
32+
public TransportStreamsStatusAction(
33+
TransportService transportService,
34+
ClusterService clusterService,
35+
ThreadPool threadPool,
36+
ActionFilters actionFilters,
37+
ProjectResolver projectResolver
38+
) {
39+
super(
40+
StreamsStatusAction.INSTANCE.name(),
41+
transportService,
42+
clusterService,
43+
threadPool,
44+
actionFilters,
45+
StreamsStatusAction.Request::new,
46+
StreamsStatusAction.Response::new,
47+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
48+
);
49+
this.projectResolver = projectResolver;
50+
}
51+
52+
@Override
53+
protected void masterOperation(
54+
Task task,
55+
StreamsStatusAction.Request request,
56+
ClusterState state,
57+
ActionListener<StreamsStatusAction.Response> listener
58+
) throws Exception {
59+
ProjectId projectId = projectResolver.getProjectId();
60+
StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
61+
boolean logsEnabled = streamsState.isLogsEnabled();
62+
StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled);
63+
listener.onResponse(response);
64+
}
65+
66+
@Override
67+
protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ClusterState state) {
68+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
69+
}
70+
}

0 commit comments

Comments
 (0)