1919import org .elasticsearch .action .ActionListener ;
2020import org .elasticsearch .action .DocWriteRequest ;
2121import org .elasticsearch .action .admin .cluster .node .info .NodeInfo ;
22+ import org .elasticsearch .action .admin .cluster .node .info .NodesInfoMetrics ;
23+ import org .elasticsearch .action .admin .cluster .node .info .NodesInfoRequest ;
2224import org .elasticsearch .action .admin .cluster .node .info .NodesInfoResponse ;
2325import org .elasticsearch .action .bulk .FailureStoreMetrics ;
2426import org .elasticsearch .action .bulk .IndexDocFailureStoreStatus ;
2931import org .elasticsearch .action .support .RefCountingRunnable ;
3032import org .elasticsearch .action .support .master .AcknowledgedResponse ;
3133import org .elasticsearch .client .internal .Client ;
34+ import org .elasticsearch .client .internal .OriginSettingClient ;
3235import org .elasticsearch .cluster .ClusterChangedEvent ;
3336import org .elasticsearch .cluster .ClusterState ;
3437import org .elasticsearch .cluster .ClusterStateApplier ;
@@ -153,11 +156,24 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
153156 private volatile ClusterState state ;
154157 private final ProjectResolver projectResolver ;
155158 private final FeatureService featureService ;
159+ private final Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener ;
156160
157161 private static BiFunction <Long , Runnable , Scheduler .ScheduledCancellable > createScheduler (ThreadPool threadPool ) {
158162 return (delay , command ) -> threadPool .schedule (command , TimeValue .timeValueMillis (delay ), threadPool .generic ());
159163 }
160164
165+ private static Consumer <ActionListener <NodesInfoResponse >> createNodeInfoListener (Client client ) {
166+ // This client is only used to perform an internal implementation detail,
167+ // so uses an internal origin context rather than the user context
168+ final OriginSettingClient originSettingClient = new OriginSettingClient (client , INGEST_ORIGIN );
169+ return (nodeListener ) -> {
170+ NodesInfoRequest nodesInfoRequest = new NodesInfoRequest ();
171+ nodesInfoRequest .clear ();
172+ nodesInfoRequest .addMetric (NodesInfoMetrics .Metric .INGEST .metricName ());
173+ originSettingClient .admin ().cluster ().nodesInfo (nodesInfoRequest , nodeListener );
174+ };
175+ }
176+
161177 public static MatcherWatchdog createGrokThreadWatchdog (Environment env , ThreadPool threadPool ) {
162178 final Settings settings = env .settings ();
163179 final BiFunction <Long , Runnable , Scheduler .ScheduledCancellable > scheduler = createScheduler (threadPool );
@@ -240,7 +256,8 @@ public IngestService(
240256 MatcherWatchdog matcherWatchdog ,
241257 FailureStoreMetrics failureStoreMetrics ,
242258 ProjectResolver projectResolver ,
243- FeatureService featureService
259+ FeatureService featureService ,
260+ Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener
244261 ) {
245262 this .clusterService = clusterService ;
246263 this .scriptService = scriptService ;
@@ -264,6 +281,36 @@ public IngestService(
264281 this .failureStoreMetrics = failureStoreMetrics ;
265282 this .projectResolver = projectResolver ;
266283 this .featureService = featureService ;
284+ this .nodeInfoListener = nodeInfoListener ;
285+ }
286+
287+ public IngestService (
288+ ClusterService clusterService ,
289+ ThreadPool threadPool ,
290+ Environment env ,
291+ ScriptService scriptService ,
292+ AnalysisRegistry analysisRegistry ,
293+ List <IngestPlugin > ingestPlugins ,
294+ Client client ,
295+ MatcherWatchdog matcherWatchdog ,
296+ FailureStoreMetrics failureStoreMetrics ,
297+ ProjectResolver projectResolver ,
298+ FeatureService featureService
299+ ) {
300+ this (
301+ clusterService ,
302+ threadPool ,
303+ env ,
304+ scriptService ,
305+ analysisRegistry ,
306+ ingestPlugins ,
307+ client ,
308+ matcherWatchdog ,
309+ failureStoreMetrics ,
310+ projectResolver ,
311+ featureService ,
312+ createNodeInfoListener (client )
313+ );
267314 }
268315
269316 /**
@@ -282,6 +329,7 @@ public IngestService(
282329 this .failureStoreMetrics = ingestService .failureStoreMetrics ;
283330 this .projectResolver = ingestService .projectResolver ;
284331 this .featureService = ingestService .featureService ;
332+ this .nodeInfoListener = ingestService .nodeInfoListener ;
285333 }
286334
287335 private static Map <String , Processor .Factory > processorFactories (List <IngestPlugin > ingestPlugins , Processor .Parameters parameters ) {
@@ -535,12 +583,8 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
535583 /**
536584 * Stores the specified pipeline definition in the request.
537585 */
538- public void putPipeline (
539- ProjectId projectId ,
540- PutPipelineRequest request ,
541- ActionListener <AcknowledgedResponse > listener ,
542- Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener
543- ) throws Exception {
586+ public void putPipeline (ProjectId projectId , PutPipelineRequest request , ActionListener <AcknowledgedResponse > listener )
587+ throws Exception {
544588 if (isNoOpPipelineUpdate (state .metadata ().getProject (projectId ), request )) {
545589 // existing pipeline matches request pipeline -- no need to update
546590 listener .onResponse (AcknowledgedResponse .TRUE );
0 commit comments