Skip to content

Commit 6d1b654

Browse files
committed
PR Fixes - Switch to local metadata action type and improve request handling
1 parent 8fa2ccd commit 6d1b654

File tree

7 files changed

+59
-47
lines changed

7 files changed

+59
-47
lines changed

modules/streams/build.gradle

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
110
apply plugin: 'elasticsearch.test-with-dependencies'
211
apply plugin: 'elasticsearch.internal-cluster-test'
312
apply plugin: 'elasticsearch.internal-yaml-rest-test'
@@ -11,8 +20,7 @@ esplugin {
1120

1221
restResources {
1322
restApi {
14-
// TODO: Limit this down to just required API's for faster build. See https://github.com/elastic/elasticsearch/blob/fb3149cc664eb7061d55741a87e7e8cf29db4989/TESTING.asciidoc#L443
15-
include '*'
23+
include '_common', 'streams'
1624
}
1725
}
1826

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/RestStreamsStatusAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.rest.RestUtils;
1717
import org.elasticsearch.rest.Scope;
1818
import org.elasticsearch.rest.ServerlessScope;
19+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1920
import org.elasticsearch.rest.action.RestToXContentListener;
2021

2122
import java.util.Collections;
@@ -27,7 +28,7 @@
2728
@ServerlessScope(Scope.PUBLIC)
2829
public class RestStreamsStatusAction extends BaseRestHandler {
2930

30-
public static final Set<String> SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_MASTER_TIMEOUT_PARAM);
31+
public static final Set<String> SUPPORTED_PARAMS = Collections.singleton(RestUtils.REST_TIMEOUT_PARAM);
3132

3233
@Override
3334
public String getName() {
@@ -41,8 +42,12 @@ public List<RestHandler.Route> routes() {
4142

4243
@Override
4344
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
44-
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getMasterNodeTimeout(request));
45-
return restChannel -> client.execute(StreamsStatusAction.INSTANCE, statusRequest, new RestToXContentListener<>(restChannel));
45+
StreamsStatusAction.Request statusRequest = new StreamsStatusAction.Request(RestUtils.getAckTimeout(request));
46+
return restChannel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
47+
StreamsStatusAction.INSTANCE,
48+
statusRequest,
49+
new RestToXContentListener<>(restChannel)
50+
);
4651
}
4752

4853
@Override

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/StreamsStatusAction.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,40 +9,33 @@
99

1010
package org.elasticsearch.rest.streams.logs;
1111

12-
import org.elasticsearch.action.ActionRequestValidationException;
1312
import org.elasticsearch.action.ActionResponse;
1413
import org.elasticsearch.action.ActionType;
15-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
14+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1615
import org.elasticsearch.common.io.stream.StreamInput;
1716
import org.elasticsearch.common.io.stream.StreamOutput;
1817
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.tasks.CancellableTask;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.tasks.TaskId;
1921
import org.elasticsearch.xcontent.ToXContentObject;
2022
import org.elasticsearch.xcontent.XContentBuilder;
2123

2224
import java.io.IOException;
25+
import java.util.Map;
2326

2427
public class StreamsStatusAction {
2528

2629
public static ActionType<Response> INSTANCE = new ActionType<>("cluster:admin/streams/status");
2730

28-
public static class Request extends MasterNodeReadRequest<Request> {
29-
30-
public Request(TimeValue masterNodeTimeout) {
31-
super(masterNodeTimeout);
32-
}
33-
34-
public Request(StreamInput in) throws IOException {
35-
super(in);
31+
public static class Request extends LocalClusterStateRequest {
32+
protected Request(TimeValue masterTimeout) {
33+
super(masterTimeout);
3634
}
3735

3836
@Override
39-
public ActionRequestValidationException validate() {
40-
return null;
41-
}
42-
43-
@Override
44-
public void writeTo(StreamOutput out) throws IOException {
45-
super.writeTo(out);
37+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
38+
return new CancellableTask(id, type, action, "Streams status request", parentTaskId, headers);
4639
}
4740
}
4841

modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportStreamsStatusAction.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.support.ActionFilters;
14-
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
15-
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
15+
import org.elasticsearch.cluster.ProjectState;
1616
import org.elasticsearch.cluster.block.ClusterBlockException;
1717
import org.elasticsearch.cluster.block.ClusterBlockLevel;
18-
import org.elasticsearch.cluster.metadata.ProjectId;
1918
import org.elasticsearch.cluster.project.ProjectResolver;
2019
import org.elasticsearch.cluster.service.ClusterService;
2120
import org.elasticsearch.injection.guice.Inject;
@@ -28,9 +27,9 @@
2827
* Transport action to retrieve the status of logs streams in a project / cluster.
2928
* Results are broken down by stream type. Currently only logs streams are implemented.
3029
*/
31-
public class TransportStreamsStatusAction extends TransportMasterNodeReadAction<StreamsStatusAction.Request, StreamsStatusAction.Response> {
32-
33-
private final ProjectResolver projectResolver;
30+
public class TransportStreamsStatusAction extends TransportLocalProjectMetadataAction<
31+
StreamsStatusAction.Request,
32+
StreamsStatusAction.Response> {
3433

3534
@Inject
3635
public TransportStreamsStatusAction(
@@ -42,33 +41,29 @@ public TransportStreamsStatusAction(
4241
) {
4342
super(
4443
StreamsStatusAction.INSTANCE.name(),
45-
transportService,
46-
clusterService,
47-
threadPool,
4844
actionFilters,
49-
StreamsStatusAction.Request::new,
50-
StreamsStatusAction.Response::new,
51-
threadPool.executor(ThreadPool.Names.MANAGEMENT)
45+
transportService.getTaskManager(),
46+
clusterService,
47+
threadPool.executor(ThreadPool.Names.MANAGEMENT),
48+
projectResolver
5249
);
53-
this.projectResolver = projectResolver;
5450
}
5551

5652
@Override
57-
protected void masterOperation(
53+
protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ProjectState state) {
54+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
55+
}
56+
57+
@Override
58+
protected void localClusterStateOperation(
5859
Task task,
5960
StreamsStatusAction.Request request,
60-
ClusterState state,
61+
ProjectState state,
6162
ActionListener<StreamsStatusAction.Response> listener
62-
) throws Exception {
63-
ProjectId projectId = projectResolver.getProjectId();
64-
StreamsMetadata streamsState = state.projectState(projectId).metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
63+
) {
64+
StreamsMetadata streamsState = state.metadata().custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
6565
boolean logsEnabled = streamsState.isLogsEnabled();
6666
StreamsStatusAction.Response response = new StreamsStatusAction.Response(logsEnabled);
6767
listener.onResponse(response);
6868
}
69-
70-
@Override
71-
protected ClusterBlockException checkBlock(StreamsStatusAction.Request request, ClusterState state) {
72-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
73-
}
7469
}

rest-api-spec/src/main/resources/rest-api-spec/api/streams.status.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
]
2424
},
2525
"params": {
26-
"master_timeout": {
26+
"timeout": {
2727
"type": "time",
28-
"description": "Period to wait for a connection to the master node. If no response is received before the timeout expires, the request fails and returns an error."
28+
"description": "Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error."
2929
}
3030
}
3131
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ static TransportVersion def(int id) {
299299
public static final TransportVersion NONE_CHUNKING_STRATEGY = def(9_097_0_00);
300300
public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00);
301301
public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_099_0_00);
302+
302303
/*
303304
* STOP! READ THIS FIRST! No, really,
304305
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
apply plugin: 'elasticsearch.internal-yaml-rest-test'
2+
/*
3+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
4+
* or more contributor license agreements. Licensed under the "Elastic License
5+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
6+
* Public License v 1"; you may not use this file except in compliance with, at
7+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
8+
* License v3.0 only", or the "Server Side Public License, v 1".
9+
*/
10+
211
import org.elasticsearch.gradle.util.GradleUtils
312

413
dependencies {
@@ -21,6 +30,7 @@ dependencies {
2130
restTestConfig project(path: ':modules:data-streams', configuration: "basicRestSpecs")
2231
restTestConfig project(path: ':modules:ingest-common', configuration: "basicRestSpecs")
2332
restTestConfig project(path: ':modules:reindex', configuration: "basicRestSpecs")
33+
restTestConfig project(path: ':modules:streams', configuration: "basicRestSpecs")
2434
}
2535

2636
// let the yamlRestTests see the classpath of test

0 commit comments

Comments
 (0)