Skip to content

Commit 8ee29e5

Browse files
committed
Working :D
1 parent c41af5a commit 8ee29e5

File tree

4 files changed

+48
-46
lines changed

4 files changed

+48
-46
lines changed

modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsMetadata.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import java.util.EnumSet;
2424
import java.util.Iterator;
2525

26-
// TODO: Make Metadata.ProjectCustom
27-
public class StreamsMetadata extends AbstractNamedDiffable<Metadata.ClusterCustom> implements Metadata.ClusterCustom {
26+
/**
27+
* Metadata for the Streams feature, which allows enabling or disabling logs for data streams.
28+
* This class implements the Metadata.ProjectCustom interface to allow it to be stored in the cluster state.
29+
*/
30+
public class StreamsMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom> implements Metadata.ProjectCustom {
2831

2932
public static final String TYPE = "streams";
33+
public static final StreamsMetadata EMPTY = new StreamsMetadata(false);
3034

3135
public boolean logsEnabled;
3236

@@ -68,11 +72,7 @@ public void writeTo(StreamOutput out) throws IOException {
6872

6973
@Override
7074
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
71-
return Iterators.concat(
72-
ChunkedToXContentHelper.startObject(),
73-
ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled)),
74-
ChunkedToXContentHelper.endObject()
75-
);
75+
return Iterators.concat(ChunkedToXContentHelper.chunk((builder, bParams) -> builder.field("logs_enabled", logsEnabled)));
7676
}
7777

7878
}

modules/streams/src/main/java/org/elasticsearch/rest/streams/StreamsPlugin.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.rest.streams;
1111

12+
import org.elasticsearch.cluster.metadata.DataStream;
1213
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1314
import org.elasticsearch.cluster.metadata.Metadata;
1415
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.rest.streams.logs.RestSetLogStreamsEnabledAction;
2728
import org.elasticsearch.rest.streams.logs.TransportLogsStreamsToggleActivation;
2829

30+
import java.util.Collections;
2931
import java.util.List;
3032
import java.util.function.Predicate;
3133
import java.util.function.Supplier;
@@ -47,8 +49,10 @@ public List<RestHandler> getRestHandlers(
4749
Supplier<DiscoveryNodes> nodesInCluster,
4850
Predicate<NodeFeature> clusterSupportsFeature
4951
) {
50-
// TODO: Check for feature flag
51-
return List.of(new RestSetLogStreamsEnabledAction());
52+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
53+
return List.of(new RestSetLogStreamsEnabledAction());
54+
}
55+
return Collections.emptyList();
5256
}
5357

5458
@Override
@@ -59,6 +63,6 @@ public List<ActionHandler> getActions() {
5963
@Override
6064
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
6165
// TODO: Update to project custom
62-
return List.of(new NamedWriteableRegistry.Entry(Metadata.ClusterCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new));
66+
return List.of(new NamedWriteableRegistry.Entry(Metadata.ProjectCustom.class, StreamsMetadata.TYPE, StreamsMetadata::new));
6367
}
6468
}

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestSetLogStreamsEnabledAction.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@
99

1010
package org.elasticsearch.rest.streams.logs;
1111

12-
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.client.internal.node.NodeClient;
1413
import org.elasticsearch.rest.BaseRestHandler;
1514
import org.elasticsearch.rest.RestRequest;
16-
import org.elasticsearch.rest.RestResponse;
17-
import org.elasticsearch.rest.RestStatus;
1815
import org.elasticsearch.rest.RestUtils;
1916
import org.elasticsearch.rest.Scope;
2017
import org.elasticsearch.rest.ServerlessScope;
18+
import org.elasticsearch.rest.action.RestToXContentListener;
2119

2220
import java.io.IOException;
2321
import java.util.List;
@@ -31,7 +29,7 @@ public String getName() {
3129
return "streams_logs_set_enabled_action";
3230
}
3331

34-
//TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable
32+
// TODO: Drop note in streams channel to check if it's OK to have one security permission for both enable and disable
3533
@Override
3634
public List<Route> routes() {
3735
return List.of(new Route(GET, "/_streams/logs/_enable"), new Route(GET, "/_streams/logs/_disable"));
@@ -41,22 +39,15 @@ public List<Route> routes() {
4139
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
4240
final boolean enabled = request.path().endsWith("_enable");
4341
assert enabled || request.path().endsWith("_disable");
44-
return restChannel -> {
45-
client.execute(
46-
LogsStreamsActivationToggleAction.INSTANCE,
47-
new LogsStreamsActivationToggleAction.Request(
48-
RestUtils.getMasterNodeTimeout(request),
49-
RestUtils.getTimeout(request),
50-
enabled
51-
),
52-
ActionListener.wrap(
53-
response -> restChannel.sendResponse(
54-
new RestResponse(RestStatus.OK, response.isAcknowledged() ? "enabled" : "disabled")
55-
),
56-
e -> restChannel.sendResponse(new RestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()))
57-
)
58-
);
59-
};
42+
return restChannel -> client.execute(
43+
LogsStreamsActivationToggleAction.INSTANCE,
44+
new LogsStreamsActivationToggleAction.Request(
45+
RestUtils.getMasterNodeTimeout(request),
46+
RestUtils.getAckTimeout(request),
47+
enabled
48+
),
49+
new RestToXContentListener<>(restChannel)
50+
);
6051
}
6152

6253
}

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.cluster.SequentialTaskAckingTaskExecutor;
2222
import org.elasticsearch.cluster.block.ClusterBlockException;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
24+
import org.elasticsearch.cluster.metadata.ProjectId;
25+
import org.elasticsearch.cluster.project.ProjectResolver;
2426
import org.elasticsearch.cluster.service.ClusterService;
2527
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2628
import org.elasticsearch.common.Priority;
@@ -30,11 +32,13 @@
3032
import org.elasticsearch.threadpool.ThreadPool;
3133
import org.elasticsearch.transport.TransportService;
3234

33-
// TODO: Are master actions synced or do I need double-check locking?
35+
import java.util.Locale;
36+
3437
public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportMasterNodeAction<LogsStreamsActivationToggleAction.Request> {
3538

3639
private static final Logger logger = LogManager.getLogger(TransportLogsStreamsToggleActivation.class);
3740

41+
private final ProjectResolver projectResolver;
3842
private final MasterServiceTaskQueue<StreamsMetadataUpdateTask> taskQueue;
3943

4044
@Inject
@@ -43,7 +47,8 @@ public TransportLogsStreamsToggleActivation(
4347
TransportService transportService,
4448
ClusterService clusterService,
4549
ThreadPool threadPool,
46-
ActionFilters actionFilters
50+
ActionFilters actionFilters,
51+
ProjectResolver projectResolver
4752
) {
4853
super(
4954
actionName,
@@ -59,6 +64,7 @@ public TransportLogsStreamsToggleActivation(
5964
Priority.NORMAL,
6065
new SequentialTaskAckingTaskExecutor<>()
6166
);
67+
this.projectResolver = projectResolver;
6268
}
6369

6470
@Override
@@ -68,20 +74,18 @@ protected void masterOperation(
6874
ClusterState state,
6975
ActionListener<AcknowledgedResponse> listener
7076
) throws Exception {
71-
// TODO: Short circuit if state == requested state. Listener & return
72-
StreamsMetadata streamsState = state.metadata().custom(StreamsMetadata.TYPE);
77+
ProjectId projectId = projectResolver.getProjectId();
78+
StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
7379
boolean currentlyEnabled = streamsState.isLogsEnabled();
7480
boolean shouldEnable = request.shouldEnable();
75-
// TODO: Remove hook methods. Service will be listening to cluster state for changes
7681
if (shouldEnable != currentlyEnabled) {
77-
if (shouldEnable) {
78-
setupActions();
79-
} else {
80-
cleanupActions();
81-
}
82-
StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, shouldEnable);
82+
StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
8383
// TODO: Append state to task name (See TransportStartSLMAction)
84-
taskQueue.submitTask("enable-streams-logs-", updateTask, updateTask.timeout());
84+
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
85+
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
86+
} else {
87+
logger.debug("Logs streams are already in the requested state: {}", shouldEnable);
88+
listener.onResponse(AcknowledgedResponse.TRUE);
8589
}
8690
}
8791

@@ -99,23 +103,26 @@ protected ClusterBlockException checkBlock(LogsStreamsActivationToggleAction.Req
99103
}
100104

101105
static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask {
106+
private ProjectId projectId;
102107
private final boolean enabled;
103108

104109
StreamsMetadataUpdateTask(
105110
AcknowledgedRequest<?> request,
106111
ActionListener<? extends AcknowledgedResponse> listener,
112+
ProjectId projectId,
107113
boolean enabled
108114
) {
109115
super(request, listener);
116+
this.projectId = projectId;
110117
this.enabled = enabled;
111118
}
112119

113120
@Override
114121
public ClusterState execute(ClusterState currentState) throws Exception {
115-
// TODO: Builder?
116-
StreamsMetadata metadata = currentState.metadata().custom(StreamsMetadata.TYPE);
117-
metadata.setLogsEnabled(enabled);
118-
return currentState;
122+
return currentState.copyAndUpdateProject(
123+
projectId,
124+
builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled))
125+
);
119126
}
120127
}
121128
}

0 commit comments

Comments
 (0)