Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
- WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536))
- Add ingest pipeline support for pull-based ingestion ([#20873](https://github.com/opensearch-project/OpenSearch/issues/20873))
- Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844))
- Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503))
- Add ref_path support for package-based hunspell dictionary loading ([#20840](https://github.com/opensearch-project/OpenSearch/pull/20840))
Expand Down
2 changes: 2 additions & 0 deletions plugins/ingestion-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ dependencies {
testImplementation "org.apache.commons:commons-lang3:${versions.commonslang}"
testImplementation "commons-io:commons-io:${versions.commonsio}"
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation project(':modules:ingest-common')
testImplementation project(':modules:lang-painless')
}

internalClusterTest{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestPipelineExecutor;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.IngestionSettings;
import org.opensearch.indices.pollingingest.PollingIngestStats;
Expand All @@ -61,17 +62,17 @@ public class IngestionEngine extends InternalEngine {
private StreamPoller streamPoller;
private final IngestionConsumerFactory ingestionConsumerFactory;
private final DocumentMapperForType documentMapperForType;
private final IngestService ingestService;
private final IngestPipelineExecutor pipelineExecutor;
private volatile IngestionShardPointer lastCommittedBatchStartPointer;

public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
this(engineConfig, ingestionConsumerFactory, null);
}

public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
super(engineConfig);
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
this.ingestService = ingestService;
this.pipelineExecutor = new IngestPipelineExecutor(
Objects.requireNonNull(ingestService),
engineConfig.getIndexSettings().getIndex().getName(),
engineConfig.getIndexSettings()
);
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
registerDynamicIndexSettingsHandlers();
}
Expand Down Expand Up @@ -155,6 +156,7 @@ private void initializeStreamPoller(
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
.mapperType(ingestionSource.getMapperType())
.mapperSettings(ingestionSource.getMapperSettings())
.pipelineExecutor(pipelineExecutor)
.warmupConfig(ingestionSource.getWarmupConfig())
.build();
registerStreamPollerListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,22 @@ private DefaultStreamPoller(
long pointerBasedLagUpdateIntervalMs,
IngestionMessageMapper.MapperType mapperType,
Map<String, Object> mapperSettings,
IngestPipelineExecutor pipelineExecutor,
IngestionSource.WarmupConfig warmupConfig
) {
this(
startPointer,
consumerFactory,
consumerClientId,
shardId,
new PartitionedBlockingQueueContainer(numProcessorThreads, shardId, ingestionEngine, errorStrategy, blockingQueueSize),
new PartitionedBlockingQueueContainer(
numProcessorThreads,
shardId,
ingestionEngine,
errorStrategy,
blockingQueueSize,
pipelineExecutor
),
resetState,
resetValue,
errorStrategy,
Expand Down Expand Up @@ -732,6 +740,7 @@ public static class Builder {
private long pointerBasedLagUpdateIntervalMs = 10000;
private IngestionMessageMapper.MapperType mapperType = IngestionMessageMapper.MapperType.DEFAULT;
private Map<String, Object> mapperSettings = Collections.emptyMap();
private IngestPipelineExecutor pipelineExecutor;
// Warmup configuration - default matches IndexMetadata settings
private IngestionSource.WarmupConfig warmupConfig = new IngestionSource.WarmupConfig(TimeValue.timeValueMillis(-1), 100L);

Expand Down Expand Up @@ -842,7 +851,14 @@ public Builder mapperSettings(Map<String, Object> mapperSettings) {
}

/**
* Set warmup enabled
* Set pipeline executor for ingest pipeline execution
*/
public Builder pipelineExecutor(IngestPipelineExecutor pipelineExecutor) {
this.pipelineExecutor = pipelineExecutor;
return this;
}

/**
* Set warmup configuration
*/
public Builder warmupConfig(IngestionSource.WarmupConfig warmupConfig) {
Expand Down Expand Up @@ -871,6 +887,7 @@ public DefaultStreamPoller build() {
pointerBasedLagUpdateIntervalMs,
mapperType,
mapperSettings,
pipelineExecutor,
warmupConfig
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Nullable;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.IngestService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Handles ingest pipeline resolution and execution for pull-based ingestion.
*
* <p>Resolves configured pipelines from index settings at initialization and executes them
* synchronously by bridging IngestService's async callback API with CompletableFuture.
* Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}.
* Only {@code final_pipeline} is supported.
*
* <p>Unlike push-based indexing, pipeline execution in pull-based ingestion does not require the
* node to have the {@code ingest} role. Transformations are executed locally on the node hosting the
* shard, and requests are not forwarded to dedicated ingest nodes.
*/
public class IngestPipelineExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can highlight in the javadocs that ingest pipeline/processors on pull-based ingestion flow does not require INGEST roles and executes the transformations on the current node (request is not forwarded to ingest nodes).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah missed adding that. Added now


private static final Logger logger = LogManager.getLogger(IngestPipelineExecutor.class);

// TODO: consider making this configurable via index settings if use cases with slow processors arise
static final long PIPELINE_EXECUTION_TIMEOUT_SECONDS = 30;

private final IngestService ingestService;
private final String index;
private volatile String resolvedFinalPipeline;

/**
* Creates an IngestPipelineExecutor for the given index.
* Resolves the final pipeline from index settings and registers a dynamic settings listener.
*
* @param ingestService the ingest service for pipeline execution
* @param index the index name
* @param indexSettings the index settings to resolve a pipeline from and register listener on
*/
public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
this.ingestService = Objects.requireNonNull(ingestService);
this.index = Objects.requireNonNull(index);
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
}

/**
* Visible for testing. Creates an executor with a pre-resolved pipeline name,
* bypassing resolution from index settings.
*
* @param ingestService the ingest service
* @param index the index name
* @param finalPipeline the resolved final pipeline name, or null if no pipeline is configured
*/
IngestPipelineExecutor(IngestService ingestService, String index, @Nullable String finalPipeline) {
this.ingestService = Objects.requireNonNull(ingestService);
this.index = Objects.requireNonNull(index);
this.resolvedFinalPipeline = finalPipeline;
}

/**
* Updates the cached final pipeline name. Called on initial resolution and on dynamic settings change.
*/
void updateFinalPipeline(String finalPipeline) {
if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipeline)) {
resolvedFinalPipeline = null;
} else {
resolvedFinalPipeline = finalPipeline;
}
}

/**
* Executes final_pipeline on the source map synchronously using CompletableFuture to bridge
* IngestService's async callback API.
*
* @param id document ID
* @param sourceMap source map to transform
* @return the transformed source map, or null if the document was dropped by the pipeline
* @throws Exception if pipeline execution fails
*/
public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception {
final String finalPipeline = resolvedFinalPipeline;
if (finalPipeline == null) {
return sourceMap;
}

// Build IndexRequest to carry the document through the pipeline
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(id);
indexRequest.source(sourceMap);

indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
indexRequest.setFinalPipeline(finalPipeline);
indexRequest.isPipelineResolved(true);

final String originalId = id;
final String originalRouting = indexRequest.routing();

CompletableFuture<Void> future = new CompletableFuture<>();
AtomicBoolean dropped = new AtomicBoolean(false);

ingestService.executeBulkRequest(
1,
Collections.singletonList(indexRequest),
(slot, e) -> future.completeExceptionally(e),
(thread, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else {
future.complete(null);
}
},
slot -> dropped.set(true),
ThreadPool.Names.WRITE
);

// Block until pipeline execution completes (with timeout)
try {
future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be better to add synchronous execution support in IngestService, something like

executePipelineSync(..) {
  CountDownLatch latch = new CountDownLatch(1);
  // execute the pipeline (ex: innerExecute(..))
  latch.await()
}

If this is possible, we could possibly execute the pipelines on the same thread avoiding the thread handoff. For async pipelines, it would still continue to wait for the result to be available.

What do you think? Have we already explored this path and run into any other challenges?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, executing on same thread would avoid the handoff overhead. But there are a few things I considered -

  1. IngestService is a core class used by all push based indexing. Adding a sync execution might modify a stable interface and would need deeper review which could be beyond the scope of this PR. Additionally, runBulkRequestInBatch() handles batching, metrics tracking, pipeline chaining, index change detection, and slot management. While we can expose a sync path through all of that but looks non trivial. Wdyt?
  2. For async processors, we'd still need a latch/future to block for those cases. The internal Pipeline.execute() -> IngestDocument.executePipeline() chain is fundamentally callback-based
  3. And there seem to be no practical impact for most processors(low weight simpler ones) as execution time dominates the context switch cost

Even with above nuances, this could be a valid optimization and can be taken up as a follow up when we benchmark our changes. Can create a tracking issue for this. Let me know your thoughts

} catch (TimeoutException e) {
throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
}

if (dropped.get()) {
return null;
}

// verify _id and _routing have not been mutated
if (Objects.equals(originalId, indexRequest.id()) == false) {
throw new IllegalStateException(
"Ingest pipeline attempted to change _id from ["
+ originalId
+ "] to ["
+ indexRequest.id()
+ "]. _id mutations are not allowed in pull-based ingestion."
);
}
if (Objects.equals(originalRouting, indexRequest.routing()) == false) {
throw new IllegalStateException(
"Ingest pipeline attempted to change _routing. _routing mutations are not allowed in pull-based ingestion."
);
}

// _index change is already blocked by final_pipeline semantics in IngestService

return indexRequest.sourceAsMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.pollingingest;

import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.common.Nullable;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
Expand All @@ -27,12 +26,11 @@
public class IngestionEngineFactory implements EngineFactory {

private final IngestionConsumerFactory ingestionConsumerFactory;
@Nullable
private final Supplier<IngestService> ingestServiceSupplier;

public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory, Supplier<IngestService> ingestServiceSupplier) {
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
this.ingestServiceSupplier = ingestServiceSupplier;
this.ingestServiceSupplier = Objects.requireNonNull(ingestServiceSupplier);
}

/**
Expand All @@ -45,9 +43,8 @@ public Engine newReadWriteEngine(EngineConfig config) {
IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource();
boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled();

IngestService ingestService = ingestServiceSupplier != null ? ingestServiceSupplier.get() : null;
assert ingestService != null || ingestServiceSupplier == null
: "IngestService supplier returned null. This indicates a initialization ordering issue.";
IngestService ingestService = ingestServiceSupplier.get();
assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue.";

if (isAllActiveIngestion) {
// use ingestion engine on both primary and replica in all-active mode
Expand Down
Loading
Loading