Skip to content

Commit 9cf800a

Browse files
authored
Merge branch 'main' into fix/flaky-gauge-tests
2 parents bcd28b9 + dbe98aa commit 9cf800a

File tree

14 files changed

+1538
-73
lines changed

14 files changed

+1538
-73
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
2020
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
2121
- WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536))
22+
- Add ingest pipeline support for pull-based ingestion ([#20873](https://github.com/opensearch-project/OpenSearch/issues/20873))
2223
- Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844))
2324
- Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503))
2425
- Add new setting property 'Sensitive' for tiering dynamic settings ([#20901](https://github.com/opensearch-project/OpenSearch/pull/20901))

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ plugins {
5555
id 'lifecycle-base'
5656
id 'opensearch.docker-support'
5757
id 'opensearch.global-build-info'
58-
id "com.diffplug.spotless" version "8.0.0" apply false
58+
id "com.diffplug.spotless" version "8.4.0" apply false
5959
id "org.gradle.test-retry" version "1.6.2" apply false
6060
id "test-report-aggregation"
6161
id 'jacoco-report-aggregation'

plugins/ingestion-kafka/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ dependencies {
4444
testImplementation "org.apache.commons:commons-lang3:${versions.commonslang}"
4545
testImplementation "commons-io:commons-io:${versions.commonsio}"
4646
testImplementation 'org.awaitility:awaitility:4.2.0'
47+
testImplementation project(':modules:ingest-common')
48+
testImplementation project(':modules:lang-painless')
4749
}
4850

4951
internalClusterTest{

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java

Lines changed: 869 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.index.translog.TranslogStats;
3838
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
3939
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
40+
import org.opensearch.indices.pollingingest.IngestPipelineExecutor;
4041
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
4142
import org.opensearch.indices.pollingingest.IngestionSettings;
4243
import org.opensearch.indices.pollingingest.PollingIngestStats;
@@ -62,17 +63,17 @@ public class IngestionEngine extends InternalEngine {
6263
private StreamPoller streamPoller;
6364
private final IngestionConsumerFactory ingestionConsumerFactory;
6465
private final DocumentMapperForType documentMapperForType;
65-
private final IngestService ingestService;
66+
private final IngestPipelineExecutor pipelineExecutor;
6667
private volatile IngestionShardPointer lastCommittedBatchStartPointer;
6768

68-
public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
69-
this(engineConfig, ingestionConsumerFactory, null);
70-
}
71-
7269
public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
7370
super(engineConfig);
7471
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
75-
this.ingestService = ingestService;
72+
this.pipelineExecutor = new IngestPipelineExecutor(
73+
Objects.requireNonNull(ingestService),
74+
engineConfig.getIndexSettings().getIndex().getName(),
75+
engineConfig.getIndexSettings()
76+
);
7677
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
7778
registerDynamicIndexSettingsHandlers();
7879
}
@@ -156,6 +157,7 @@ private void initializeStreamPoller(
156157
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
157158
.mapperType(ingestionSource.getMapperType())
158159
.mapperSettings(ingestionSource.getMapperSettings())
160+
.pipelineExecutor(pipelineExecutor)
159161
.warmupConfig(ingestionSource.getWarmupConfig())
160162
.build();
161163
registerStreamPollerListener();

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,22 @@ private DefaultStreamPoller(
117117
long pointerBasedLagUpdateIntervalMs,
118118
IngestionMessageMapper.MapperType mapperType,
119119
Map<String, Object> mapperSettings,
120+
IngestPipelineExecutor pipelineExecutor,
120121
IngestionSource.WarmupConfig warmupConfig
121122
) {
122123
this(
123124
startPointer,
124125
consumerFactory,
125126
consumerClientId,
126127
shardId,
127-
new PartitionedBlockingQueueContainer(numProcessorThreads, shardId, ingestionEngine, errorStrategy, blockingQueueSize),
128+
new PartitionedBlockingQueueContainer(
129+
numProcessorThreads,
130+
shardId,
131+
ingestionEngine,
132+
errorStrategy,
133+
blockingQueueSize,
134+
pipelineExecutor
135+
),
128136
resetState,
129137
resetValue,
130138
errorStrategy,
@@ -754,6 +762,7 @@ public static class Builder {
754762
private long pointerBasedLagUpdateIntervalMs = 10000;
755763
private IngestionMessageMapper.MapperType mapperType = IngestionMessageMapper.MapperType.DEFAULT;
756764
private Map<String, Object> mapperSettings = Collections.emptyMap();
765+
private IngestPipelineExecutor pipelineExecutor;
757766
// Warmup configuration - default matches IndexMetadata settings
758767
private IngestionSource.WarmupConfig warmupConfig = new IngestionSource.WarmupConfig(TimeValue.timeValueMillis(-1), 100L);
759768

@@ -864,7 +873,14 @@ public Builder mapperSettings(Map<String, Object> mapperSettings) {
864873
}
865874

866875
/**
867-
* Set warmup enabled
876+
* Set pipeline executor for ingest pipeline execution
877+
*/
878+
public Builder pipelineExecutor(IngestPipelineExecutor pipelineExecutor) {
879+
this.pipelineExecutor = pipelineExecutor;
880+
return this;
881+
}
882+
883+
/**
868884
* Set warmup configuration
869885
*/
870886
public Builder warmupConfig(IngestionSource.WarmupConfig warmupConfig) {
@@ -893,6 +909,7 @@ public DefaultStreamPoller build() {
893909
pointerBasedLagUpdateIntervalMs,
894910
mapperType,
895911
mapperSettings,
912+
pipelineExecutor,
896913
warmupConfig
897914
);
898915
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.index.IndexRequest;
14+
import org.opensearch.common.Nullable;
15+
import org.opensearch.index.IndexSettings;
16+
import org.opensearch.ingest.IngestService;
17+
import org.opensearch.threadpool.ThreadPool;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
/**
29+
* Handles ingest pipeline resolution and execution for pull-based ingestion.
30+
*
31+
* <p>Resolves configured pipelines from index settings at initialization and executes them
32+
* synchronously by bridging IngestService's async callback API with CompletableFuture.
33+
* Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}.
34+
* Only {@code final_pipeline} is supported.
35+
*
36+
* <p>Unlike push-based indexing, pipeline execution in pull-based ingestion does not require the
37+
* node to have the {@code ingest} role. Transformations are executed locally on the node hosting the
38+
* shard, and requests are not forwarded to dedicated ingest nodes.
39+
*/
40+
public class IngestPipelineExecutor {
41+
42+
private static final Logger logger = LogManager.getLogger(IngestPipelineExecutor.class);
43+
44+
// TODO: consider making this configurable via index settings if use cases with slow processors arise
45+
static final long PIPELINE_EXECUTION_TIMEOUT_SECONDS = 30;
46+
47+
// TODO: explore synchronous pipeline execution (IngestService.executeBulkRequestSync) to avoid
48+
// thread pool dispatch and execute pipelines directly on the processor thread
49+
50+
private final IngestService ingestService;
51+
private final String index;
52+
private volatile String resolvedFinalPipeline;
53+
54+
/**
55+
* Creates an IngestPipelineExecutor for the given index.
56+
* Resolves the final pipeline from index settings and registers a dynamic settings listener.
57+
*
58+
* @param ingestService the ingest service for pipeline execution
59+
* @param index the index name
60+
* @param indexSettings the index settings to resolve a pipeline from and register listener on
61+
*/
62+
public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) {
63+
this.ingestService = Objects.requireNonNull(ingestService);
64+
this.index = Objects.requireNonNull(index);
65+
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline);
66+
updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings()));
67+
}
68+
69+
/**
70+
* Visible for testing. Creates an executor with a pre-resolved pipeline name,
71+
* bypassing resolution from index settings.
72+
*
73+
* @param ingestService the ingest service
74+
* @param index the index name
75+
* @param finalPipeline the resolved final pipeline name, or null if no pipeline is configured
76+
*/
77+
IngestPipelineExecutor(IngestService ingestService, String index, @Nullable String finalPipeline) {
78+
this.ingestService = Objects.requireNonNull(ingestService);
79+
this.index = Objects.requireNonNull(index);
80+
this.resolvedFinalPipeline = finalPipeline;
81+
}
82+
83+
/**
84+
* Updates the cached final pipeline name. Called on initial resolution and on dynamic settings change.
85+
*/
86+
void updateFinalPipeline(String finalPipeline) {
87+
if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipeline)) {
88+
resolvedFinalPipeline = null;
89+
} else {
90+
resolvedFinalPipeline = finalPipeline;
91+
}
92+
}
93+
94+
/**
95+
* Executes final_pipeline on the source map synchronously using CompletableFuture to bridge
96+
* IngestService's async callback API.
97+
*
98+
* @param id document ID
99+
* @param sourceMap source map to transform
100+
* @return the transformed source map, or null if the document was dropped by the pipeline
101+
* @throws Exception if pipeline execution fails
102+
*/
103+
public Map<String, Object> executePipelines(String id, Map<String, Object> sourceMap) throws Exception {
104+
final String finalPipeline = resolvedFinalPipeline;
105+
if (finalPipeline == null) {
106+
return sourceMap;
107+
}
108+
109+
// Build IndexRequest to carry the document through the pipeline
110+
IndexRequest indexRequest = new IndexRequest(index);
111+
indexRequest.id(id);
112+
indexRequest.source(sourceMap);
113+
114+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
115+
indexRequest.setFinalPipeline(finalPipeline);
116+
indexRequest.isPipelineResolved(true);
117+
118+
final String originalId = id;
119+
final String originalRouting = indexRequest.routing();
120+
121+
CompletableFuture<Void> future = new CompletableFuture<>();
122+
AtomicBoolean dropped = new AtomicBoolean(false);
123+
124+
ingestService.executeBulkRequest(
125+
1,
126+
Collections.singletonList(indexRequest),
127+
(slot, e) -> future.completeExceptionally(e),
128+
(thread, e) -> {
129+
if (e != null) {
130+
future.completeExceptionally(e);
131+
} else {
132+
future.complete(null);
133+
}
134+
},
135+
slot -> dropped.set(true),
136+
ThreadPool.Names.WRITE
137+
);
138+
139+
// Block until pipeline execution completes (with timeout)
140+
try {
141+
future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
142+
} catch (TimeoutException e) {
143+
throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
144+
} catch (InterruptedException e) {
145+
Thread.currentThread().interrupt();
146+
throw new RuntimeException("Ingest pipeline execution was interrupted", e);
147+
} catch (ExecutionException e) {
148+
throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
149+
}
150+
151+
if (dropped.get()) {
152+
return null;
153+
}
154+
155+
// verify _id and _routing have not been mutated
156+
if (Objects.equals(originalId, indexRequest.id()) == false) {
157+
throw new IllegalStateException(
158+
"Ingest pipeline attempted to change _id from ["
159+
+ originalId
160+
+ "] to ["
161+
+ indexRequest.id()
162+
+ "]. _id mutations are not allowed in pull-based ingestion."
163+
);
164+
}
165+
if (Objects.equals(originalRouting, indexRequest.routing()) == false) {
166+
throw new IllegalStateException(
167+
"Ingest pipeline attempted to change _routing. _routing mutations are not allowed in pull-based ingestion."
168+
);
169+
}
170+
171+
// _index change is already blocked by final_pipeline semantics in IngestService
172+
173+
return indexRequest.sourceAsMap();
174+
}
175+
}

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.indices.pollingingest;
1010

1111
import org.opensearch.cluster.metadata.IngestionSource;
12-
import org.opensearch.common.Nullable;
1312
import org.opensearch.index.IngestionConsumerFactory;
1413
import org.opensearch.index.engine.Engine;
1514
import org.opensearch.index.engine.EngineConfig;
@@ -27,12 +26,11 @@
2726
public class IngestionEngineFactory implements EngineFactory {
2827

2928
private final IngestionConsumerFactory ingestionConsumerFactory;
30-
@Nullable
3129
private final Supplier<IngestService> ingestServiceSupplier;
3230

3331
public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory, Supplier<IngestService> ingestServiceSupplier) {
3432
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
35-
this.ingestServiceSupplier = ingestServiceSupplier;
33+
this.ingestServiceSupplier = Objects.requireNonNull(ingestServiceSupplier);
3634
}
3735

3836
/**
@@ -45,9 +43,8 @@ public Engine newReadWriteEngine(EngineConfig config) {
4543
IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource();
4644
boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled();
4745

48-
IngestService ingestService = ingestServiceSupplier != null ? ingestServiceSupplier.get() : null;
49-
assert ingestService != null || ingestServiceSupplier == null
50-
: "IngestService supplier returned null. This indicates a initialization ordering issue.";
46+
IngestService ingestService = ingestServiceSupplier.get();
47+
assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue.";
5148

5249
if (isAllActiveIngestion) {
5350
// use ingestion engine on both primary and replica in all-active mode

0 commit comments

Comments
 (0)