Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.SamplingService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -90,6 +91,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
protected final Executor systemCoordinationExecutor;
private final ActionType<BulkResponse> bulkAction;
protected final FeatureService featureService;
protected final SamplingService samplingService;

public TransportAbstractBulkAction(
ActionType<BulkResponse> action,
Expand All @@ -103,7 +105,8 @@ public TransportAbstractBulkAction(
SystemIndices systemIndices,
ProjectResolver projectResolver,
LongSupplier relativeTimeNanosProvider,
FeatureService featureService
FeatureService featureService,
SamplingService samplingService
) {
super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
Expand All @@ -119,6 +122,7 @@ public TransportAbstractBulkAction(
clusterService.addStateApplier(this.ingestForwarder);
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
this.bulkAction = action;
this.samplingService = samplingService;
}

@Override
Expand Down Expand Up @@ -204,13 +208,18 @@ private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executo
executor.execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener, true);
}
});
}

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
private boolean applyPipelines(
Task task,
BulkRequest bulkRequest,
Executor executor,
ActionListener<BulkResponse> listener,
boolean firstTime
) throws IOException {
boolean hasIndexRequestsWithPipelines = false;
ClusterState state = clusterService.state();
ProjectId projectId = projectResolver.getProjectId();
Expand Down Expand Up @@ -303,6 +312,13 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
}
});
return true;
} else if (firstTime && samplingService != null && samplingService.atLeastOneSampleConfigured()) {
// else sample, but only if this is the first time through. Otherwise we had pipelines and sampled in IngestService
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest ir) {
samplingService.maybeSample(project, ir);
}
}
}
return false;
}
Expand Down Expand Up @@ -338,7 +354,7 @@ private void processBulkIndexIngestRequest(
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener, false);
}

@Override
Expand Down Expand Up @@ -416,7 +432,8 @@ private void applyPipelinesAndDoInternalExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
ActionListener<BulkResponse> listener
ActionListener<BulkResponse> listener,
boolean firstTime
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();

Expand All @@ -434,7 +451,7 @@ private void applyPipelinesAndDoInternalExecute(

var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);

if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, firstTime) == false) {
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.SamplingService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -101,7 +102,8 @@ public TransportBulkAction(
ProjectResolver projectResolver,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
FeatureService featureService,
SamplingService samplingService
) {
this(
threadPool,
Expand All @@ -117,7 +119,8 @@ public TransportBulkAction(
threadPool::relativeTimeInNanos,
failureStoreMetrics,
dataStreamFailureStoreSettings,
featureService
featureService,
samplingService
);
}

Expand All @@ -135,7 +138,8 @@ public TransportBulkAction(
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
FeatureService featureService,
SamplingService samplingService
) {
this(
TYPE,
Expand All @@ -153,7 +157,8 @@ public TransportBulkAction(
relativeTimeProvider,
failureStoreMetrics,
dataStreamFailureStoreSettings,
featureService
featureService,
samplingService
);
}

Expand All @@ -173,7 +178,8 @@ public TransportBulkAction(
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
FeatureService featureService
FeatureService featureService,
SamplingService samplingService
) {
super(
bulkAction,
Expand All @@ -187,7 +193,8 @@ public TransportBulkAction(
systemIndices,
projectResolver,
relativeTimeProvider,
featureService
featureService,
samplingService
);
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
Objects.requireNonNull(relativeTimeProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public TransportSimulateBulkAction(
systemIndices,
projectResolver,
threadPool::relativeTimeInNanos,
featureService
featureService,
null
);
this.indicesService = indicesService;
this.xContentRegistry = xContentRegistry;
Expand Down
86 changes: 78 additions & 8 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand All @@ -77,10 +78,12 @@
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.Metadata;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayList;
Expand All @@ -98,6 +101,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -151,6 +155,7 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
private volatile ClusterState state;
private final ProjectResolver projectResolver;
private final FeatureService featureService;
private final SamplingService samplingService;
private final Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
Expand Down Expand Up @@ -252,6 +257,7 @@ public IngestService(
FailureStoreMetrics failureStoreMetrics,
ProjectResolver projectResolver,
FeatureService featureService,
SamplingService samplingService,
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
) {
this.clusterService = clusterService;
Expand All @@ -276,6 +282,7 @@ public IngestService(
this.failureStoreMetrics = failureStoreMetrics;
this.projectResolver = projectResolver;
this.featureService = featureService;
this.samplingService = samplingService;
this.nodeInfoListener = nodeInfoListener;
}

Expand All @@ -290,7 +297,8 @@ public IngestService(
MatcherWatchdog matcherWatchdog,
FailureStoreMetrics failureStoreMetrics,
ProjectResolver projectResolver,
FeatureService featureService
FeatureService featureService,
SamplingService samplingService
) {
this(
clusterService,
Expand All @@ -304,6 +312,7 @@ public IngestService(
failureStoreMetrics,
projectResolver,
featureService,
samplingService,
createNodeInfoListener(client)
);
}
Expand All @@ -324,6 +333,7 @@ public IngestService(
this.failureStoreMetrics = ingestService.failureStoreMetrics;
this.projectResolver = ingestService.projectResolver;
this.featureService = ingestService.featureService;
this.samplingService = ingestService.samplingService;
this.nodeInfoListener = ingestService.nodeInfoListener;
}

Expand Down Expand Up @@ -971,6 +981,7 @@ protected void doRun() {
Pipeline firstPipeline = pipelines.peekFirst();
if (pipelines.hasNext() == false) {
i++;
samplingService.maybeSample(state.metadata().projects().get(pipelines.projectId()), indexRequest);
continue;
}

Expand All @@ -983,7 +994,7 @@ protected void doRun() {
final int slot = i;
final Releasable ref = refs.acquire();
final IngestDocument ingestDocument = newIngestDocument(indexRequest);
final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
// the document listener gives us three-way logic: a document can fail processing (1), or it can
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
final ActionListener<IngestPipelinesExecutionResult> documentListener = ActionListener.runAfter(
Expand Down Expand Up @@ -1030,7 +1041,14 @@ public void onFailure(Exception e) {
}
);

executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
executePipelines(
pipelines,
indexRequest,
ingestDocument,
adaptedResolveFailureStore,
documentListener,
originalDocumentMetadata
);
assert actionRequest.index() != null;

i++;
Expand Down Expand Up @@ -1149,7 +1167,8 @@ private void executePipelines(
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
final Function<String, Boolean> resolveFailureStore,
final ActionListener<IngestPipelinesExecutionResult> listener
final ActionListener<IngestPipelinesExecutionResult> listener,
final Metadata originalDocumentMetadata
) {
assert pipelines.hasNext();
PipelineSlot slot = pipelines.next();
Expand Down Expand Up @@ -1180,12 +1199,12 @@ private void executePipelines(
listener.onFailure(e);
}
};

AtomicBoolean haveAttemptedSampling = new AtomicBoolean(false);
final var project = state.metadata().projects().get(pipelines.projectId());
try {
if (pipeline == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
final var project = state.metadata().projects().get(pipelines.projectId());
if (project == null) {
throw new IllegalArgumentException("project with id [" + pipelines.projectId() + "] does not exist");
}
Expand Down Expand Up @@ -1335,15 +1354,24 @@ private void executePipelines(
}

if (newPipelines.hasNext()) {
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener);
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata);
} else {
// update the index request's source and (potentially) cache the timestamp for TSDB
/*
* At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
* This is our chance to sample with both the original document and all changes.
*/
haveAttemptedSampling.set(true);
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
updateIndexRequestSource(indexRequest, ingestDocument);
cacheRawTimestamp(indexRequest, ingestDocument);
listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded!
}
});
} catch (Exception e) {
if (haveAttemptedSampling.get() == false) {
// It is possible that an exception happened after we sampled. We do not want to sample the same document twice.
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
}
logger.debug(
() -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()),
e
Expand All @@ -1352,6 +1380,48 @@ private void executePipelines(
}
}

private void attemptToSampleData(
ProjectMetadata projectMetadata,
IndexRequest indexRequest,
IngestDocument ingestDocument,
Metadata originalDocumentMetadata
) {
if (samplingService != null && samplingService.atLeastOneSampleConfigured()) {
try {
/*
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
* before overwriting it here. We can discard it after sampling.
*/
IndexRequest original = copyIndexRequest(indexRequest);
updateIndexRequestMetadata(original, originalDocumentMetadata);
samplingService.maybeSample(projectMetadata, original, ingestDocument);
} catch (IOException ex) {
logger.warn("unable to sample data");
}
}
}

private IndexRequest copyIndexRequest(IndexRequest original) throws IOException {
IndexRequest clonedRequest = new IndexRequest(original.index());
clonedRequest.id(original.id());
clonedRequest.routing(original.routing());
clonedRequest.version(original.version());
clonedRequest.versionType(original.versionType());
clonedRequest.setPipeline(original.getPipeline());
clonedRequest.setIfSeqNo(original.ifSeqNo());
clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm());
clonedRequest.setRefreshPolicy(original.getRefreshPolicy());
clonedRequest.waitForActiveShards(original.waitForActiveShards());
clonedRequest.timeout(original.timeout());
clonedRequest.opType(original.opType());
clonedRequest.setParentTask(original.getParentTask());
BytesReference source = original.source();
if (source != null) {
clonedRequest.source(source, original.getContentType());
}
return clonedRequest;
}

private static void executePipeline(
final IngestDocument ingestDocument,
final Pipeline pipeline,
Expand Down
Loading