Skip to content

Commit 5efe216

Browse files
authored
Run GetPipelineTransportAction on local node (#120445)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent f404da0 commit 5efe216

File tree

8 files changed

+67
-109
lines changed

8 files changed

+67
-109
lines changed

docs/changelog/120445.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120445
2+
summary: Run `GetPipelineTransportAction` on local node
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestActionCancellationIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
2222
import org.elasticsearch.action.admin.indices.template.post.SimulateIndexTemplateAction;
2323
import org.elasticsearch.action.admin.indices.template.post.SimulateTemplateAction;
24+
import org.elasticsearch.action.ingest.GetPipelineAction;
2425
import org.elasticsearch.action.support.CancellableActionTestPlugin;
2526
import org.elasticsearch.action.support.PlainActionFuture;
2627
import org.elasticsearch.action.support.RefCountingListener;
@@ -103,6 +104,10 @@ public void testClusterGetSettingsCancellation() {
103104
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_cluster/settings"), ClusterGetSettingsAction.NAME);
104105
}
105106

107+
public void testGetPipelineCancellation() {
108+
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_ingest/pipeline"), GetPipelineAction.NAME);
109+
}
110+
106111
private void runRestActionCancellationTest(Request request, String actionName) {
107112
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
108113

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequest.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
package org.elasticsearch.action.ingest;
1111

1212
import org.elasticsearch.action.ActionRequestValidationException;
13-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
13+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1414
import org.elasticsearch.common.io.stream.StreamInput;
15-
import org.elasticsearch.common.io.stream.StreamOutput;
1615
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.core.UpdateForV10;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1720

1821
import java.io.IOException;
22+
import java.util.Map;
1923

20-
public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest> {
24+
public class GetPipelineRequest extends LocalClusterStateRequest {
2125

2226
private final String[] ids;
2327
private final boolean summary;
@@ -35,19 +39,17 @@ public GetPipelineRequest(TimeValue masterNodeTimeout, String... ids) {
3539
this(masterNodeTimeout, false, ids);
3640
}
3741

42+
/**
43+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
44+
* we no longer need to support calling this action remotely.
45+
*/
46+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
3847
public GetPipelineRequest(StreamInput in) throws IOException {
3948
super(in);
4049
ids = in.readStringArray();
4150
summary = in.readBoolean();
4251
}
4352

44-
@Override
45-
public void writeTo(StreamOutput out) throws IOException {
46-
super.writeTo(out);
47-
out.writeStringArray(ids);
48-
out.writeBoolean(summary);
49-
}
50-
5153
public String[] getIds() {
5254
return ids;
5355
}
@@ -60,4 +62,9 @@ public boolean isSummary() {
6062
public ActionRequestValidationException validate() {
6163
return null;
6264
}
65+
66+
@Override
67+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
68+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
69+
}
6370
}

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@
1111

1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.common.Strings;
14-
import org.elasticsearch.common.io.stream.StreamInput;
1514
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.core.UpdateForV10;
1616
import org.elasticsearch.ingest.PipelineConfiguration;
1717
import org.elasticsearch.rest.RestStatus;
1818
import org.elasticsearch.xcontent.ToXContentObject;
1919
import org.elasticsearch.xcontent.XContentBuilder;
2020

2121
import java.io.IOException;
22-
import java.util.ArrayList;
2322
import java.util.Collections;
2423
import java.util.HashMap;
2524
import java.util.List;
@@ -30,16 +29,6 @@ public class GetPipelineResponse extends ActionResponse implements ToXContentObj
3029
private final List<PipelineConfiguration> pipelines;
3130
private final boolean summary;
3231

33-
public GetPipelineResponse(StreamInput in) throws IOException {
34-
super(in);
35-
int size = in.readVInt();
36-
pipelines = new ArrayList<>(size);
37-
for (int i = 0; i < size; i++) {
38-
pipelines.add(PipelineConfiguration.readFrom(in));
39-
}
40-
summary = in.readBoolean();
41-
}
42-
4332
public GetPipelineResponse(List<PipelineConfiguration> pipelines, boolean summary) {
4433
this.pipelines = pipelines;
4534
this.summary = summary;
@@ -58,6 +47,11 @@ public List<PipelineConfiguration> pipelines() {
5847
return Collections.unmodifiableList(pipelines);
5948
}
6049

50+
/**
51+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
52+
* we no longer need to support calling this action remotely.
53+
*/
54+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
6155
@Override
6256
public void writeTo(StreamOutput out) throws IOException {
6357
out.writeCollection(pipelines);

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,45 +11,56 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.support.ActionFilters;
14-
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
14+
import org.elasticsearch.action.support.ChannelActionListener;
15+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.block.ClusterBlockException;
1718
import org.elasticsearch.cluster.block.ClusterBlockLevel;
18-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.util.concurrent.EsExecutors;
21+
import org.elasticsearch.core.UpdateForV10;
2122
import org.elasticsearch.ingest.IngestService;
2223
import org.elasticsearch.injection.guice.Inject;
24+
import org.elasticsearch.tasks.CancellableTask;
2325
import org.elasticsearch.tasks.Task;
24-
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.transport.TransportService;
2627

27-
public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {
28+
public class GetPipelineTransportAction extends TransportLocalClusterStateAction<GetPipelineRequest, GetPipelineResponse> {
2829

30+
/**
31+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
32+
* we no longer need to support calling this action remotely.
33+
*/
34+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
35+
@SuppressWarnings("this-escape")
2936
@Inject
30-
public GetPipelineTransportAction(
31-
ThreadPool threadPool,
32-
ClusterService clusterService,
33-
TransportService transportService,
34-
ActionFilters actionFilters,
35-
IndexNameExpressionResolver indexNameExpressionResolver
36-
) {
37+
public GetPipelineTransportAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
3738
super(
3839
GetPipelineAction.NAME,
39-
transportService,
40-
clusterService,
41-
threadPool,
4240
actionFilters,
43-
GetPipelineRequest::new,
44-
indexNameExpressionResolver,
45-
GetPipelineResponse::new,
41+
transportService.getTaskManager(),
42+
clusterService,
4643
EsExecutors.DIRECT_EXECUTOR_SERVICE
4744
);
45+
46+
transportService.registerRequestHandler(
47+
actionName,
48+
executor,
49+
false,
50+
true,
51+
GetPipelineRequest::new,
52+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
53+
);
4854
}
4955

5056
@Override
51-
protected void masterOperation(Task task, GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener)
52-
throws Exception {
57+
protected void localClusterStateOperation(
58+
Task task,
59+
GetPipelineRequest request,
60+
ClusterState state,
61+
ActionListener<GetPipelineResponse> listener
62+
) throws Exception {
63+
((CancellableTask) task).ensureNotCancelled();
5364
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()), request.isSummary()));
5465
}
5566

server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.RestRequest;
1919
import org.elasticsearch.rest.Scope;
2020
import org.elasticsearch.rest.ServerlessScope;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestToXContentListener;
2223

2324
import java.io.IOException;
@@ -46,7 +47,7 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
4647
restRequest.paramAsBoolean("summary", false),
4748
Strings.splitStringByCommaToArray(restRequest.param("id"))
4849
);
49-
return channel -> client.execute(
50+
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
5051
GetPipelineAction.INSTANCE,
5152
request,
5253
new RestToXContentListener<>(channel, GetPipelineResponse::status)

server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,23 @@
1010
package org.elasticsearch.action.ingest;
1111

1212
import org.elasticsearch.common.bytes.BytesReference;
13-
import org.elasticsearch.common.io.stream.Writeable;
14-
import org.elasticsearch.common.util.CollectionUtils;
1513
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
1614
import org.elasticsearch.ingest.PipelineConfiguration;
17-
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
15+
import org.elasticsearch.test.ESTestCase;
1816
import org.elasticsearch.xcontent.ToXContent;
1917
import org.elasticsearch.xcontent.XContentBuilder;
2018
import org.elasticsearch.xcontent.XContentParser;
2119
import org.elasticsearch.xcontent.XContentType;
2220

2321
import java.io.IOException;
24-
import java.io.UncheckedIOException;
2522
import java.util.ArrayList;
2623
import java.util.HashMap;
2724
import java.util.List;
2825
import java.util.Map;
2926

3027
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
3128

32-
public class GetPipelineResponseTests extends AbstractXContentSerializingTestCase<GetPipelineResponse> {
29+
public class GetPipelineResponseTests extends ESTestCase {
3330

3431
private XContentBuilder getRandomXContentBuilder() throws IOException {
3532
XContentType xContentType = randomFrom(XContentType.values());
@@ -83,7 +80,6 @@ public void testXContentDeserialization() throws IOException {
8380
}
8481
}
8582

86-
@Override
8783
protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOException {
8884
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
8985
List<PipelineConfiguration> pipelines = new ArrayList<>();
@@ -104,24 +100,4 @@ protected GetPipelineResponse doParseInstance(XContentParser parser) throws IOEx
104100
return new GetPipelineResponse(pipelines);
105101
}
106102

107-
@Override
108-
protected GetPipelineResponse createTestInstance() {
109-
try {
110-
return new GetPipelineResponse(new ArrayList<>(createPipelineConfigMap().values()));
111-
} catch (IOException e) {
112-
throw new UncheckedIOException(e);
113-
}
114-
}
115-
116-
@Override
117-
protected Writeable.Reader<GetPipelineResponse> instanceReader() {
118-
return GetPipelineResponse::new;
119-
}
120-
121-
@Override
122-
protected GetPipelineResponse mutateInstance(GetPipelineResponse response) throws IOException {
123-
return new GetPipelineResponse(
124-
CollectionUtils.appendToCopy(response.pipelines(), createRandomPipeline("pipeline_" + response.pipelines().size() + 1))
125-
);
126-
}
127103
}

x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/GetPipelineRequestTests.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)