Skip to content

Commit 2204296

Browse files
committed
PR Fixes - Enforce local only use for status action
1 parent faf7dab commit 2204296

File tree

3 files changed

+6
-9
lines changed

3 files changed

+6
-9
lines changed

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.support.TransportAction;
1415
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
15-
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
1717
import org.elasticsearch.core.TimeValue;
1818
import org.elasticsearch.tasks.CancellableTask;
@@ -47,13 +47,9 @@ public Response(boolean logsEnabled) {
4747
logs_enabled = logsEnabled;
4848
}
4949

50-
public Response(StreamInput in) throws IOException {
51-
logs_enabled = in.readBoolean();
52-
}
53-
5450
@Override
5551
public void writeTo(StreamOutput out) throws IOException {
56-
out.writeBoolean(logs_enabled);
52+
TransportAction.localOnly();
5753
}
5854

5955
@Override

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
1919
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2020
import org.elasticsearch.cluster.ClusterState;
21-
import org.elasticsearch.cluster.SequentialTaskAckingTaskExecutor;
21+
import org.elasticsearch.cluster.SequentialAckingBatchedTaskExecutor;
2222
import org.elasticsearch.cluster.block.ClusterBlockException;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2424
import org.elasticsearch.cluster.metadata.ProjectId;
@@ -64,7 +64,7 @@ public TransportLogsStreamsToggleActivation(
6464
this.taskQueue = clusterService.createTaskQueue(
6565
"streams-update-state-queue",
6666
Priority.NORMAL,
67-
new SequentialTaskAckingTaskExecutor<>()
67+
new SequentialAckingBatchedTaskExecutor<>()
6868
);
6969
this.projectResolver = projectResolver;
7070
}

server/src/main/java/org/elasticsearch/cluster/SequentialTaskAckingTaskExecutor.java renamed to server/src/main/java/org/elasticsearch/cluster/SequentialAckingBatchedTaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
*
1919
* @param <Task> The type of the task that extends {@link AckedClusterStateUpdateTask}.
2020
*/
21-
public class SequentialTaskAckingTaskExecutor<Task extends AckedClusterStateUpdateTask> extends SimpleBatchedAckListenerTaskExecutor<Task> {
21+
public class SequentialAckingBatchedTaskExecutor<Task extends AckedClusterStateUpdateTask> extends SimpleBatchedAckListenerTaskExecutor<
22+
Task> {
2223
@Override
2324
public Tuple<ClusterState, ClusterStateAckListener> executeTask(Task task, ClusterState clusterState) throws Exception {
2425
return Tuple.tuple(task.execute(clusterState), task);

0 commit comments

Comments
 (0)