Skip to content

Commit 57de3e5

Browse files
committed
Refactor IngestService node info collection
1 parent 9890f98 commit 57de3e5

File tree

3 files changed

+70
-40
lines changed

3 files changed

+70
-40
lines changed

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,9 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionType;
14-
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
15-
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
1614
import org.elasticsearch.action.support.ActionFilters;
1715
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1816
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
19-
import org.elasticsearch.client.internal.OriginSettingClient;
20-
import org.elasticsearch.client.internal.node.NodeClient;
2117
import org.elasticsearch.cluster.ClusterState;
2218
import org.elasticsearch.cluster.block.ClusterBlockException;
2319
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -32,12 +28,9 @@
3228
import java.util.Optional;
3329
import java.util.Set;
3430

35-
import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;
36-
3731
public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction<PutPipelineRequest> {
3832
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/ingest/pipeline/put");
3933
private final IngestService ingestService;
40-
private final OriginSettingClient client;
4134
private final ProjectResolver projectResolver;
4235

4336
@Inject
@@ -46,8 +39,7 @@ public PutPipelineTransportAction(
4639
TransportService transportService,
4740
ActionFilters actionFilters,
4841
ProjectResolver projectResolver,
49-
IngestService ingestService,
50-
NodeClient client
42+
IngestService ingestService
5143
) {
5244
super(
5345
TYPE.name(),
@@ -58,22 +50,14 @@ public PutPipelineTransportAction(
5850
PutPipelineRequest::new,
5951
EsExecutors.DIRECT_EXECUTOR_SERVICE
6052
);
61-
// This client is only used to perform an internal implementation detail,
62-
// so uses an internal origin context rather than the user context
63-
this.client = new OriginSettingClient(client, INGEST_ORIGIN);
6453
this.ingestService = ingestService;
6554
this.projectResolver = projectResolver;
6655
}
6756

6857
@Override
6958
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
7059
throws Exception {
71-
ingestService.putPipeline(projectResolver.getProjectId(), request, listener, (nodeListener) -> {
72-
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
73-
nodesInfoRequest.clear();
74-
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
75-
client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
76-
});
60+
ingestService.putPipeline(projectResolver.getProjectId(), request, listener);
7761
}
7862

7963
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.elasticsearch.action.ActionListener;
2020
import org.elasticsearch.action.DocWriteRequest;
2121
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
22+
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
23+
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
2224
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
2325
import org.elasticsearch.action.bulk.FailureStoreMetrics;
2426
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
@@ -29,6 +31,7 @@
2931
import org.elasticsearch.action.support.RefCountingRunnable;
3032
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3133
import org.elasticsearch.client.internal.Client;
34+
import org.elasticsearch.client.internal.OriginSettingClient;
3235
import org.elasticsearch.cluster.ClusterChangedEvent;
3336
import org.elasticsearch.cluster.ClusterState;
3437
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -153,11 +156,24 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
153156
private volatile ClusterState state;
154157
private final ProjectResolver projectResolver;
155158
private final FeatureService featureService;
159+
private final Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener;
156160

157161
private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
158162
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
159163
}
160164

165+
private static Consumer<ActionListener<NodesInfoResponse>> createNodeInfoListener(Client client) {
166+
// This client is only used to perform an internal implementation detail,
167+
// so uses an internal origin context rather than the user context
168+
final OriginSettingClient originSettingClient = new OriginSettingClient(client, INGEST_ORIGIN);
169+
return (nodeListener) -> {
170+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
171+
nodesInfoRequest.clear();
172+
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
173+
originSettingClient.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
174+
};
175+
}
176+
161177
public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPool threadPool) {
162178
final Settings settings = env.settings();
163179
final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler = createScheduler(threadPool);
@@ -240,7 +256,8 @@ public IngestService(
240256
MatcherWatchdog matcherWatchdog,
241257
FailureStoreMetrics failureStoreMetrics,
242258
ProjectResolver projectResolver,
243-
FeatureService featureService
259+
FeatureService featureService,
260+
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
244261
) {
245262
this.clusterService = clusterService;
246263
this.scriptService = scriptService;
@@ -264,6 +281,36 @@ public IngestService(
264281
this.failureStoreMetrics = failureStoreMetrics;
265282
this.projectResolver = projectResolver;
266283
this.featureService = featureService;
284+
this.nodeInfoListener = nodeInfoListener;
285+
}
286+
287+
public IngestService(
288+
ClusterService clusterService,
289+
ThreadPool threadPool,
290+
Environment env,
291+
ScriptService scriptService,
292+
AnalysisRegistry analysisRegistry,
293+
List<IngestPlugin> ingestPlugins,
294+
Client client,
295+
MatcherWatchdog matcherWatchdog,
296+
FailureStoreMetrics failureStoreMetrics,
297+
ProjectResolver projectResolver,
298+
FeatureService featureService
299+
) {
300+
this(
301+
clusterService,
302+
threadPool,
303+
env,
304+
scriptService,
305+
analysisRegistry,
306+
ingestPlugins,
307+
client,
308+
matcherWatchdog,
309+
failureStoreMetrics,
310+
projectResolver,
311+
featureService,
312+
createNodeInfoListener(client)
313+
);
267314
}
268315

269316
/**
@@ -282,6 +329,7 @@ public IngestService(
282329
this.failureStoreMetrics = ingestService.failureStoreMetrics;
283330
this.projectResolver = ingestService.projectResolver;
284331
this.featureService = ingestService.featureService;
332+
this.nodeInfoListener = ingestService.nodeInfoListener;
285333
}
286334

287335
private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
@@ -535,12 +583,8 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
535583
/**
536584
* Stores the specified pipeline definition in the request.
537585
*/
538-
public void putPipeline(
539-
ProjectId projectId,
540-
PutPipelineRequest request,
541-
ActionListener<AcknowledgedResponse> listener,
542-
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
543-
) throws Exception {
586+
public void putPipeline(ProjectId projectId, PutPipelineRequest request, ActionListener<AcknowledgedResponse> listener)
587+
throws Exception {
544588
if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) {
545589
// existing pipeline matches request pipeline -- no need to update
546590
listener.onResponse(AcknowledgedResponse.TRUE);

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3044,6 +3044,20 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
30443044
Client client = mock(Client.class);
30453045
ClusterService clusterService = mock(ClusterService.class);
30463046
when(clusterService.state()).thenReturn(clusterState);
3047+
3048+
var consumer = new Consumer<ActionListener<NodesInfoResponse>>() {
3049+
final AtomicLong executionCount = new AtomicLong(0);
3050+
3051+
@Override
3052+
public void accept(ActionListener<NodesInfoResponse> nodesInfoResponseActionListener) {
3053+
executionCount.incrementAndGet();
3054+
}
3055+
3056+
public long getExecutionCount() {
3057+
return executionCount.get();
3058+
}
3059+
};
3060+
30473061
IngestService ingestService = new IngestService(
30483062
clusterService,
30493063
threadPool,
@@ -3060,7 +3074,8 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
30603074
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
30613075
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
30623076
}
3063-
}
3077+
},
3078+
consumer
30643079
);
30653080
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
30663081

@@ -3090,21 +3105,8 @@ public long getFailureCount() {
30903105
}
30913106
};
30923107

3093-
var consumer = new Consumer<ActionListener<NodesInfoResponse>>() {
3094-
final AtomicLong executionCount = new AtomicLong(0);
3095-
3096-
@Override
3097-
public void accept(ActionListener<NodesInfoResponse> nodesInfoResponseActionListener) {
3098-
executionCount.incrementAndGet();
3099-
}
3100-
3101-
public long getExecutionCount() {
3102-
return executionCount.get();
3103-
}
3104-
};
3105-
31063108
var request = putJsonPipelineRequest(pipelineId, pipelineString);
3107-
ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener, consumer);
3109+
ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener);
31083110
latch.await();
31093111

31103112
assertThat(consumer.getExecutionCount(), equalTo(0L));

0 commit comments

Comments
 (0)