Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -32,12 +28,9 @@
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;

public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction<PutPipelineRequest> {
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/ingest/pipeline/put");
private final IngestService ingestService;
private final OriginSettingClient client;
private final ProjectResolver projectResolver;

@Inject
Expand All @@ -46,8 +39,7 @@ public PutPipelineTransportAction(
TransportService transportService,
ActionFilters actionFilters,
ProjectResolver projectResolver,
IngestService ingestService,
NodeClient client
IngestService ingestService
) {
super(
TYPE.name(),
Expand All @@ -58,22 +50,14 @@ public PutPipelineTransportAction(
PutPipelineRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
// This client is only used to perform an internal implementation detail,
// so uses an internal origin context rather than the user context
this.client = new OriginSettingClient(client, INGEST_ORIGIN);
this.ingestService = ingestService;
this.projectResolver = projectResolver;
}

@Override
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
ingestService.putPipeline(projectResolver.getProjectId(), request, listener, (nodeListener) -> {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
});
ingestService.putPipeline(projectResolver.getProjectId(), request, listener);
}

@Override
Expand Down
58 changes: 51 additions & 7 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
Expand All @@ -29,6 +31,7 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -153,11 +156,24 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
private volatile ClusterState state;
private final ProjectResolver projectResolver;
private final FeatureService featureService;
private final Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
}

private static Consumer<ActionListener<NodesInfoResponse>> createNodeInfoListener(Client client) {
// This client is only used to perform an internal implementation detail,
// so uses an internal origin context rather than the user context
final OriginSettingClient originSettingClient = new OriginSettingClient(client, INGEST_ORIGIN);
return (nodeListener) -> {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
originSettingClient.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
};
}

public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPool threadPool) {
final Settings settings = env.settings();
final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler = createScheduler(threadPool);
Expand Down Expand Up @@ -240,7 +256,8 @@ public IngestService(
MatcherWatchdog matcherWatchdog,
FailureStoreMetrics failureStoreMetrics,
ProjectResolver projectResolver,
FeatureService featureService
FeatureService featureService,
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -264,6 +281,36 @@ public IngestService(
this.failureStoreMetrics = failureStoreMetrics;
this.projectResolver = projectResolver;
this.featureService = featureService;
this.nodeInfoListener = nodeInfoListener;
}

public IngestService(
ClusterService clusterService,
ThreadPool threadPool,
Environment env,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins,
Client client,
MatcherWatchdog matcherWatchdog,
FailureStoreMetrics failureStoreMetrics,
ProjectResolver projectResolver,
FeatureService featureService
) {
this(
clusterService,
threadPool,
env,
scriptService,
analysisRegistry,
ingestPlugins,
client,
matcherWatchdog,
failureStoreMetrics,
projectResolver,
featureService,
createNodeInfoListener(client)
);
}

/**
Expand All @@ -282,6 +329,7 @@ public IngestService(
this.failureStoreMetrics = ingestService.failureStoreMetrics;
this.projectResolver = ingestService.projectResolver;
this.featureService = ingestService.featureService;
this.nodeInfoListener = ingestService.nodeInfoListener;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -535,12 +583,8 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
/**
* Stores the specified pipeline definition in the request.
*/
public void putPipeline(
ProjectId projectId,
PutPipelineRequest request,
ActionListener<AcknowledgedResponse> listener,
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
) throws Exception {
public void putPipeline(ProjectId projectId, PutPipelineRequest request, ActionListener<AcknowledgedResponse> listener)
throws Exception {
if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) {
// existing pipeline matches request pipeline -- no need to update
listener.onResponse(AcknowledgedResponse.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3044,6 +3044,20 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);

var consumer = new Consumer<ActionListener<NodesInfoResponse>>() {
final AtomicLong executionCount = new AtomicLong(0);

@Override
public void accept(ActionListener<NodesInfoResponse> nodesInfoResponseActionListener) {
executionCount.incrementAndGet();
}

public long getExecutionCount() {
return executionCount.get();
}
};

IngestService ingestService = new IngestService(
clusterService,
threadPool,
Expand All @@ -3060,7 +3074,8 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
}
}
},
consumer
);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));

Expand Down Expand Up @@ -3090,21 +3105,8 @@ public long getFailureCount() {
}
};

var consumer = new Consumer<ActionListener<NodesInfoResponse>>() {
final AtomicLong executionCount = new AtomicLong(0);

@Override
public void accept(ActionListener<NodesInfoResponse> nodesInfoResponseActionListener) {
executionCount.incrementAndGet();
}

public long getExecutionCount() {
return executionCount.get();
}
};

var request = putJsonPipelineRequest(pipelineId, pipelineString);
ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener, consumer);
ingestService.putPipeline(clusterState.metadata().getProject().id(), request, listener);
latch.await();

assertThat(consumer.getExecutionCount(), equalTo(0L));
Expand Down