diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index e6c1b00ccb815..5ffef099833a2 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -11,13 +11,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; -import org.elasticsearch.client.internal.OriginSettingClient; -import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -32,12 +28,9 @@ import java.util.Optional; import java.util.Set; -import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; - public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction { public static final ActionType TYPE = new ActionType<>("cluster:admin/ingest/pipeline/put"); private final IngestService ingestService; - private final OriginSettingClient client; private final ProjectResolver projectResolver; @Inject @@ -46,8 +39,7 @@ public PutPipelineTransportAction( TransportService transportService, ActionFilters actionFilters, ProjectResolver projectResolver, - IngestService ingestService, - NodeClient client + IngestService ingestService ) { super( TYPE.name(), @@ -58,9 +50,6 @@ public PutPipelineTransportAction( PutPipelineRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); - // This client is only used to perform an internal implementation detail, - // so uses an internal origin context rather than the user context - this.client = new OriginSettingClient(client, INGEST_ORIGIN); this.ingestService = ingestService; this.projectResolver = projectResolver; } @@ -68,12 +57,7 @@ public PutPipelineTransportAction( @Override protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - ingestService.putPipeline(projectResolver.getProjectId(), request, listener, (nodeListener) -> { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear(); - nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName()); - client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener); - }); + ingestService.putPipeline(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 47ecdaf904801..1ef03325da778 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,6 +19,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.bulk.FailureStoreMetrics; import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; @@ -29,6 +31,7 @@ import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -153,11 +156,24 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) { private volatile ClusterState state; private final ProjectResolver projectResolver; private final FeatureService featureService; + private final Consumer> nodeInfoListener; private static BiFunction createScheduler(ThreadPool threadPool) { return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()); } + private static Consumer> createNodeInfoListener(Client client) { + // This client is only used to perform an internal implementation detail, + // so uses an internal origin context rather than the user context + final OriginSettingClient originSettingClient = new OriginSettingClient(client, INGEST_ORIGIN); + return (nodeListener) -> { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.clear(); + nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName()); + originSettingClient.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener); + }; + } + public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPool threadPool) { final Settings settings = env.settings(); final BiFunction scheduler = createScheduler(threadPool); @@ -240,7 +256,8 @@ public IngestService( MatcherWatchdog matcherWatchdog, FailureStoreMetrics failureStoreMetrics, ProjectResolver projectResolver, - FeatureService featureService + FeatureService featureService, + Consumer> nodeInfoListener ) { this.clusterService = clusterService; this.scriptService = scriptService; @@ -264,6 +281,36 @@ public IngestService( this.failureStoreMetrics = failureStoreMetrics; this.projectResolver = projectResolver; this.featureService = featureService; + this.nodeInfoListener = nodeInfoListener; + } + + public IngestService( + ClusterService clusterService, + ThreadPool threadPool, + Environment env, + ScriptService scriptService, + AnalysisRegistry analysisRegistry, + List ingestPlugins, + Client client, + MatcherWatchdog matcherWatchdog, + FailureStoreMetrics failureStoreMetrics, + ProjectResolver projectResolver, + FeatureService featureService + ) { + this( + clusterService, + threadPool, + env, + scriptService, + analysisRegistry, + ingestPlugins, + client, + matcherWatchdog, + failureStoreMetrics, + projectResolver, + featureService, + createNodeInfoListener(client) + ); } /** @@ -282,6 +329,7 @@ public IngestService( this.failureStoreMetrics = ingestService.failureStoreMetrics; this.projectResolver = ingestService.projectResolver; this.featureService = ingestService.featureService; + this.nodeInfoListener = ingestService.nodeInfoListener; } private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -535,12 +583,8 @@ static List innerGetPipelines(IngestMetadata ingestMetada /** * Stores the specified pipeline definition in the request. */ - public void putPipeline( - ProjectId projectId, - PutPipelineRequest request, - ActionListener listener, - Consumer> nodeInfoListener - ) throws Exception { + public void putPipeline(ProjectId projectId, PutPipelineRequest request, ActionListener listener) + throws Exception { if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) { // existing pipeline matches request pipeline -- no need to update listener.onResponse(AcknowledgedResponse.TRUE); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index a7bdc6a8d70a5..df3211331c7f5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -3044,6 +3044,20 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(clusterState); + + var consumer = new Consumer>() { + final AtomicLong executionCount = new AtomicLong(0); + + @Override + public void accept(ActionListener nodesInfoResponseActionListener) { + executionCount.incrementAndGet(); + } + + public long getExecutionCount() { + return executionCount.get(); + } + }; + IngestService ingestService = new IngestService( clusterService, threadPool, @@ -3060,7 +3074,8 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + consumer ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); @@ -3090,21 +3105,8 @@ public long getFailureCount() { } }; - var consumer = new Consumer>() { - final AtomicLong executionCount = new AtomicLong(0); - - @Override - public void accept(ActionListener nodesInfoResponseActionListener) { - executionCount.incrementAndGet(); - } - - public long getExecutionCount() { - return executionCount.get(); - } - }; - var request = putJsonPipelineRequest(pipelineId, pipelineString); - ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener, consumer); + ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener); latch.await(); assertThat(consumer.getExecutionCount(), equalTo(0L));