Skip to content

Commit 9521d7e

Browse files
authored
Merge branch 'main' into ES-10641_add_leak_detection_to_store
2 parents faced46 + e27a50d commit 9521d7e

File tree

11 files changed

+109
-136
lines changed

11 files changed

+109
-136
lines changed

docs/changelog/121256.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121256
2+
summary: Run `TransportEnrichStatsAction` on local node
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ tests:
122122
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
123123
method: test {p0=search.highlight/50_synthetic_source/text multi unified from vectors}
124124
issue: https://github.com/elastic/elasticsearch/issues/117815
125-
- class: org.elasticsearch.xpack.esql.plugin.ClusterRequestTests
126-
method: testFallbackIndicesOptions
127-
issue: https://github.com/elastic/elasticsearch/issues/117937
128125
- class: org.elasticsearch.xpack.ml.integration.RegressionIT
129126
method: testTwoJobsWithSameRandomizeSeedUseSameTrainingSet
130127
issue: https://github.com/elastic/elasticsearch/issues/117805

rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"params": {
2121
"master_timeout":{
2222
"type":"time",
23-
"description":"Timeout for processing on master node"
23+
"description":"Timeout for waiting for new cluster state in case it is blocked"
2424
}
2525
}
2626
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,24 @@
1010
import org.elasticsearch.action.ActionRequestValidationException;
1111
import org.elasticsearch.action.ActionResponse;
1212
import org.elasticsearch.action.ActionType;
13-
import org.elasticsearch.action.support.master.MasterNodeRequest;
13+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.common.unit.ByteSizeValue;
1818
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.core.UpdateForV10;
20+
import org.elasticsearch.tasks.CancellableTask;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
1923
import org.elasticsearch.tasks.TaskInfo;
2024
import org.elasticsearch.xcontent.ToXContentFragment;
2125
import org.elasticsearch.xcontent.ToXContentObject;
2226
import org.elasticsearch.xcontent.XContentBuilder;
2327

2428
import java.io.IOException;
2529
import java.util.List;
30+
import java.util.Map;
2631
import java.util.Objects;
2732

2833
public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
@@ -34,12 +39,17 @@ private EnrichStatsAction() {
3439
super(NAME);
3540
}
3641

37-
public static class Request extends MasterNodeRequest<Request> {
42+
public static class Request extends LocalClusterStateRequest {
3843

3944
public Request(TimeValue masterNodeTimeout) {
4045
super(masterNodeTimeout);
4146
}
4247

48+
/**
49+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
50+
* we no longer need to support calling this action remotely.
51+
*/
52+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
4353
public Request(StreamInput in) throws IOException {
4454
super(in);
4555
}
@@ -48,6 +58,11 @@ public Request(StreamInput in) throws IOException {
4858
public ActionRequestValidationException validate() {
4959
return null;
5060
}
61+
62+
@Override
63+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
64+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
65+
}
5166
}
5267

5368
public static class Response extends ActionResponse implements ToXContentObject {
@@ -62,13 +77,6 @@ public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats>
6277
this.cacheStats = cacheStats;
6378
}
6479

65-
public Response(StreamInput in) throws IOException {
66-
super(in);
67-
executingPolicies = in.readCollectionAsList(ExecutingPolicy::new);
68-
coordinatorStats = in.readCollectionAsList(CoordinatorStats::new);
69-
cacheStats = in.readCollectionAsList(CacheStats::new);
70-
}
71-
7280
public List<ExecutingPolicy> getExecutingPolicies() {
7381
return executingPolicies;
7482
}
@@ -81,6 +89,11 @@ public List<CacheStats> getCacheStats() {
8189
return cacheStats;
8290
}
8391

92+
/**
93+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
94+
* we no longer need to support calling this action remotely.
95+
*/
96+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
8497
@Override
8598
public void writeTo(StreamOutput out) throws IOException {
8699
out.writeCollection(executingPolicies);
@@ -167,10 +180,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
167180

168181
public record ExecutingPolicy(String name, TaskInfo taskInfo) implements Writeable, ToXContentFragment {
169182

170-
ExecutingPolicy(StreamInput in) throws IOException {
171-
this(in.readString(), TaskInfo.from(in));
172-
}
173-
183+
/**
184+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
185+
* we no longer need to support calling this action remotely.
186+
*/
187+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
174188
@Override
175189
public void writeTo(StreamOutput out) throws IOException {
176190
out.writeString(name);

x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.test.ESIntegTestCase;
2424
import org.elasticsearch.test.rest.ObjectPath;
2525
import org.elasticsearch.transport.netty4.Netty4Plugin;
26+
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
2627
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
2728

2829
import java.io.IOException;
@@ -62,6 +63,10 @@ public void testGetEnrichPolicyCancellation() throws IOException {
6263
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME);
6364
}
6465

66+
public void testEnrichStatsCancellation() throws IOException {
67+
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/_stats"), EnrichStatsAction.NAME);
68+
}
69+
6570
private void runRestActionCancellationTest(Request request, String actionName) {
6671
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
6772

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.FailedNodeException;
1111
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
12+
import org.elasticsearch.action.support.ChannelActionListener;
13+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1314
import org.elasticsearch.client.internal.Client;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.block.ClusterBlockException;
1617
import org.elasticsearch.cluster.block.ClusterBlockLevel;
17-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.service.ClusterService;
1919
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.core.UpdateForV10;
2021
import org.elasticsearch.injection.guice.Inject;
22+
import org.elasticsearch.tasks.CancellableTask;
2123
import org.elasticsearch.tasks.Task;
22-
import org.elasticsearch.threadpool.ThreadPool;
2324
import org.elasticsearch.transport.TransportService;
2425
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
2526
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
@@ -31,34 +32,44 @@
3132
import java.util.Objects;
3233
import java.util.stream.Collectors;
3334

34-
public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
35+
public class TransportEnrichStatsAction extends TransportLocalClusterStateAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
3536

3637
private final Client client;
3738

39+
/**
40+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
41+
* we no longer need to support calling this action remotely.
42+
*/
43+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
44+
@SuppressWarnings("this-escape")
3845
@Inject
3946
public TransportEnrichStatsAction(
4047
TransportService transportService,
4148
ClusterService clusterService,
42-
ThreadPool threadPool,
4349
ActionFilters actionFilters,
44-
IndexNameExpressionResolver indexNameExpressionResolver,
4550
Client client
4651
) {
4752
super(
4853
EnrichStatsAction.NAME,
49-
transportService,
50-
clusterService,
51-
threadPool,
5254
actionFilters,
53-
EnrichStatsAction.Request::new,
54-
EnrichStatsAction.Response::new,
55+
transportService.getTaskManager(),
56+
clusterService,
5557
EsExecutors.DIRECT_EXECUTOR_SERVICE
5658
);
5759
this.client = client;
60+
61+
transportService.registerRequestHandler(
62+
actionName,
63+
executor,
64+
false,
65+
true,
66+
EnrichStatsAction.Request::new,
67+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
68+
);
5869
}
5970

6071
@Override
61-
protected void masterOperation(
72+
protected void localClusterStateOperation(
6273
Task task,
6374
EnrichStatsAction.Request request,
6475
ClusterState state,
@@ -101,6 +112,7 @@ protected void masterOperation(
101112
.collect(Collectors.toList());
102113
delegate.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats, cacheStats));
103114
});
115+
((CancellableTask) task).ensureNotCancelled();
104116
client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
105117
}
106118

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.rest.RestUtils;
1313
import org.elasticsearch.rest.Scope;
1414
import org.elasticsearch.rest.ServerlessScope;
15+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1516
import org.elasticsearch.rest.action.RestToXContentListener;
1617
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
1718

@@ -43,7 +44,11 @@ public Set<String> supportedCapabilities() {
4344
@Override
4445
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
4546
final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));
46-
return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
47+
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
48+
EnrichStatsAction.INSTANCE,
49+
request,
50+
new RestToXContentListener<>(channel)
51+
);
4752
}
4853

4954
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java

Lines changed: 0 additions & 101 deletions
This file was deleted.

0 commit comments

Comments
 (0)