Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121256.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121256
summary: Run `TransportEnrichStatsAction` on local node
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
"description":"Timeout for waiting for new cluster state in case it is blocked"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
Expand All @@ -34,12 +39,17 @@ private EnrichStatsAction() {
super(NAME);
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends LocalClusterStateRequest {

public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in);
}
Expand All @@ -48,6 +58,11 @@ public Request(StreamInput in) throws IOException {
public ActionRequestValidationException validate() {
return null;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
}

public static class Response extends ActionResponse implements ToXContentObject {
Expand All @@ -62,13 +77,6 @@ public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats>
this.cacheStats = cacheStats;
}

public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readCollectionAsList(ExecutingPolicy::new);
coordinatorStats = in.readCollectionAsList(CoordinatorStats::new);
cacheStats = in.readCollectionAsList(CacheStats::new);
}

public List<ExecutingPolicy> getExecutingPolicies() {
return executingPolicies;
}
Expand All @@ -81,6 +89,11 @@ public List<CacheStats> getCacheStats() {
return cacheStats;
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(executingPolicies);
Expand Down Expand Up @@ -167,10 +180,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

public record ExecutingPolicy(String name, TaskInfo taskInfo) implements Writeable, ToXContentFragment {

ExecutingPolicy(StreamInput in) throws IOException {
this(in.readString(), TaskInfo.from(in));
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;

import java.io.IOException;
Expand Down Expand Up @@ -62,6 +63,10 @@ public void testGetEnrichPolicyCancellation() throws IOException {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME);
}

public void testEnrichStatsCancellation() throws IOException {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/_stats"), EnrichStatsAction.NAME);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
Expand All @@ -31,34 +32,44 @@
import java.util.Objects;
import java.util.stream.Collectors;

public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
public class TransportEnrichStatsAction extends TransportLocalClusterStateAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {

private final Client client;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportEnrichStatsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client
) {
super(
EnrichStatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
EnrichStatsAction.Request::new,
EnrichStatsAction.Response::new,
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.client = client;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
EnrichStatsAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
EnrichStatsAction.Request request,
ClusterState state,
Expand Down Expand Up @@ -101,6 +112,7 @@ protected void masterOperation(
.collect(Collectors.toList());
delegate.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats, cacheStats));
});
((CancellableTask) task).ensureNotCancelled();
client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;

Expand Down Expand Up @@ -43,7 +44,11 @@ public Set<String> supportedCapabilities() {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));
return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
EnrichStatsAction.INSTANCE,
request,
new RestToXContentListener<>(channel)
);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
Expand All @@ -21,9 +23,10 @@
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -154,4 +157,34 @@ private EnrichStatsCollector createCollector(ClusterService clusterService, XPac
return new EnrichStatsCollector(clusterService, licenseState, client);
}

public static TaskInfo randomTaskInfo() {
String nodeId = randomAlphaOfLength(5);
TaskId taskId = new TaskId(nodeId, randomLong());
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);
String description = randomAlphaOfLength(5);
long startTime = randomLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(
taskId,
type,
nodeId,
action,
description,
null,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
}

}
Loading