diff --git a/CHANGELOG.md b/CHANGELOG.md index 208cddaa27a68..1f1bfb52b0d33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729)) - Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650)) - WLM group custom search settings - groundwork and timeout ([#20536](https://github.com/opensearch-project/OpenSearch/issues/20536)) +- Add ingest pipeline support for pull-based ingestion ([#20873](https://github.com/opensearch-project/OpenSearch/issues/20873)) - Expose JVM runtime metrics via telemetry framework ([#20844](https://github.com/opensearch-project/OpenSearch/pull/20844)) - Add intra segment support for single-value metric aggregations ([#20503](https://github.com/opensearch-project/OpenSearch/pull/20503)) - Add ref_path support for package-based hunspell dictionary loading ([#20840](https://github.com/opensearch-project/OpenSearch/pull/20840)) diff --git a/plugins/ingestion-kafka/build.gradle b/plugins/ingestion-kafka/build.gradle index 2c79d156f81d8..0cf5c533f817a 100644 --- a/plugins/ingestion-kafka/build.gradle +++ b/plugins/ingestion-kafka/build.gradle @@ -44,6 +44,8 @@ dependencies { testImplementation "org.apache.commons:commons-lang3:${versions.commonslang}" testImplementation "commons-io:commons-io:${versions.commonsio}" testImplementation 'org.awaitility:awaitility:4.2.0' + testImplementation project(':modules:ingest-common') + testImplementation project(':modules:lang-painless') } internalClusterTest{ diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java new file mode 100644 index 0000000000000..b78d0ef98bfc8 --- /dev/null +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java @@ -0,0 +1,967 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.kafka; + +import org.opensearch.action.ingest.DeletePipelineRequest; +import org.opensearch.action.ingest.GetPipelineRequest; +import org.opensearch.action.ingest.GetPipelineResponse; +import org.opensearch.action.ingest.PutPipelineRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.indices.pollingingest.PollingIngestStats; +import org.opensearch.ingest.PipelineConfiguration; +import org.opensearch.ingest.common.IngestCommonModulePlugin; +import org.opensearch.painless.PainlessModulePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for ingest pipeline execution in pull-based Kafka ingestion. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class IngestPipelineFromKafkaIT extends KafkaIngestionBaseIT { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(KafkaPlugin.class, IngestCommonModulePlugin.class, PainlessModulePlugin.class); + } + + @After + public void cleanUpPipelines() { + try { + GetPipelineResponse response = client().admin().cluster().getPipeline(new GetPipelineRequest("*")).actionGet(); + for (PipelineConfiguration pipeline : response.pipelines()) { + client().admin().cluster().deletePipeline(new DeletePipelineRequest(pipeline.getId())).actionGet(); + } + } catch (Exception e) { + // ignore + } + } + + /** + * Test that a final_pipeline adds a field to documents ingested from Kafka. + * Uses built-in "set" processor. + */ + public void testFinalPipelineAddsField() throws Exception { + createPipeline("add_field_pipeline", "{\"processors\": [{\"set\": {\"field\": \"processed\", \"value\": true}}]}"); + + produceData("1", "alice", "25"); + produceData("2", "bob", "30"); + + createIndexWithPipeline("add_field_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + if (response.getHits().getTotalHits().value() < 2) return false; + Map source1 = response.getHits().getHits()[0].getSourceAsMap(); + Map source2 = response.getHits().getHits()[1].getSourceAsMap(); + return Boolean.TRUE.equals(source1.get("processed")) && Boolean.TRUE.equals(source2.get("processed")); + }); + } + + /** + * Test that a pipeline that drops documents results in no indexed documents. + * Uses built-in "drop" processor. + */ + public void testFinalPipelineDropsDocument() throws Exception { + createPipeline("drop_pipeline", "{\"processors\": [{\"drop\": {}}]}"); + + produceData("1", "alice", "25"); + produceData("2", "bob", "30"); + + createIndexWithPipeline("drop_pipeline", 1, 0); + + // Wait until both messages are processed, then verify none were indexed (all dropped) + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null && stats.getMessageProcessorStats().totalProcessedCount() >= 2; + }); + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + assertThat(response.getHits().getTotalHits().value(), is(0L)); + } + + /** + * Test that documents are indexed normally when no pipeline is configured. + */ + public void testNoPipelineConfigured() throws Exception { + produceData("1", "alice", "25"); + produceData("2", "bob", "30"); + + // Create index without pipeline (default settings) + createIndexWithDefaultSettings(1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + return response.getHits().getTotalHits().value() == 2; + }); + + // Verify documents have original fields and no pipeline-added fields + SearchResponse response = client().prepareSearch(indexName).get(); + for (int i = 0; i < response.getHits().getHits().length; i++) { + Map source = response.getHits().getHits()[i].getSourceAsMap(); + assertFalse("Document should not have 'processed' field", source.containsKey("processed")); + } + } + + /** + * Test that pipeline does NOT execute for delete operations. + */ + public void testPipelineNotCalledForDeletes() throws Exception { + createPipeline("add_field_pipeline", "{\"processors\": [{\"set\": {\"field\": \"processed\", \"value\": true}}]}"); + + // Produce an index message, then a delete message + produceData("1", "alice", "25", defaultMessageTimestamp, "index"); + createIndexWithPipeline("add_field_pipeline", 1, 0); + + // Wait for the document to be indexed + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + return response.getHits().getTotalHits().value() == 1; + }); + + // Now delete the document + produceData("1", "alice", "25", defaultMessageTimestamp, "delete"); + + // Verify document is deleted + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + return response.getHits().getTotalHits().value() == 0; + }); + } + + /** + * Test pipeline that renames a field. + * Uses built-in "rename" processor. + */ + public void testPipelineRenamesField() throws Exception { + createPipeline("rename_pipeline", "{\"processors\": [{\"rename\": {\"field\": \"name\", \"target_field\": \"full_name\"}}]}"); + + produceData("1", "alice", "25"); + createIndexWithPipeline("rename_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("full_name")) && !source.containsKey("name"); + }); + } + + /** + * Test that guardrails block a pipeline attempting to change _id. + * The message should fail and be handled by the error strategy (retried then dropped with DROP strategy). + */ + public void testPipelineMutatingIdIsBlocked() throws Exception { + createPipeline("mutate_id_pipeline", "{\"processors\": [{\"script\": {\"source\": \"ctx._id = 'mutated_id'\"}}]}"); + + produceData("1", "alice", "25"); + createIndexWithPipeline("mutate_id_pipeline", 1, 0); + + // With DROP error strategy, the message should eventually be dropped after retries + // Wait until the message is processed (and dropped due to failure) + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null && stats.getMessageProcessorStats().totalFailuresDroppedCount() >= 1; + }); + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + assertThat(response.getHits().getTotalHits().value(), is(0L)); + } + + /** + * Test that pipeline execution works with multiple documents. + * Verifies that the pipeline independently processes each document. + */ + public void testPipelineWithMultipleDocuments() throws Exception { + createPipeline("add_field_pipeline", "{\"processors\": [{\"set\": {\"field\": \"pipeline_version\", \"value\": \"v1\"}}]}"); + + // Produce multiple documents + for (int i = 0; i < 10; i++) { + produceData(String.valueOf(i), "user" + i, String.valueOf(20 + i)); + } + + createIndexWithPipeline("add_field_pipeline", 1, 0); + + // Verify all documents are indexed with the pipeline field + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + if (response.getHits().getTotalHits().value() < 10) return false; + // Verify all docs have the pipeline field + for (int i = 0; i < response.getHits().getHits().length; i++) { + if (!"v1".equals(response.getHits().getHits()[i].getSourceAsMap().get("pipeline_version"))) { + return false; + } + } + return true; + }); + } + + /** + * Test dynamic update of the final_pipeline setting. + * Documents ingested before the update should use the old pipeline, + * documents after should use the new pipeline. + */ + public void testDynamicPipelineUpdate() throws Exception { + createPipeline("pipeline_v1", "{\"processors\": [{\"set\": {\"field\": \"version\", \"value\": \"v1\"}}]}"); + createPipeline("pipeline_v2", "{\"processors\": [{\"set\": {\"field\": \"version\", \"value\": \"v2\"}}]}"); + + // Produce the first batch and create an index with v1 pipeline + produceData("1", "Alice", "25"); + createIndexWithPipeline("pipeline_v1", 1, 0); + + // Wait for the first document to be indexed with v1 + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return "v1".equals(response.getHits().getHits()[0].getSourceAsMap().get("version")); + }); + + // Dynamically update pipeline to v2 + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "pipeline_v2")) + .get() + ); + + // Produce second batch + produceData("2", "bob", "30"); + + // Verify second document uses v2 pipeline + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "2")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return "v2".equals(response.getHits().getHits()[0].getSourceAsMap().get("version")); + }); + } + + /** + * Test removing final_pipeline dynamically (set to _none). + * Documents after removal should not have pipeline-added fields. + */ + public void testRemovePipelineDynamically() throws Exception { + createPipeline("add_field_pipeline", "{\"processors\": [{\"set\": {\"field\": \"processed\", \"value\": true}}]}"); + + produceData("1", "alice", "25"); + createIndexWithPipeline("add_field_pipeline", 1, 0); + + // Wait for first document with pipeline + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return Boolean.TRUE.equals(response.getHits().getHits()[0].getSourceAsMap().get("processed")); + }); + + // Remove pipeline + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "_none")) + .get() + ); + + // Produce document after pipeline removal + produceData("2", "bob", "30"); + + // Verify the second document does NOT have the pipeline field + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "2")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return !response.getHits().getHits()[0].getSourceAsMap().containsKey("processed"); + }); + } + + /** + * Test pipeline with field_mapping mapper type combined. + * Raw Kafka message (no envelope) → field_mapping extracts _id → pipeline transforms → Lucene. + */ + public void testPipelineWithFieldMappingMapper() throws Exception { + createPipeline("enrich_pipeline", "{\"processors\": [{\"set\": {\"field\": \"enriched\", \"value\": true}}]}"); + + // Produce raw messages (no _id/_source envelope — field_mapping mapper handles it) + String rawPayload1 = "{\"user_id\": \"user1\", \"name\": \"alice\", \"age\": 25}"; + String rawPayload2 = "{\"user_id\": \"user2\", \"name\": \"bob\", \"age\": 30}"; + produceData(rawPayload1); + produceData(rawPayload2); + + // Create index with field_mapping mapper + final_pipeline + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("ingestion_source.mapper_type", "field_mapping") + .put("ingestion_source.mapper_settings.id_field", "user_id") + .put(IndexSettings.FINAL_PIPELINE.getKey(), "enrich_pipeline") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}" + ); + + // Verify documents are indexed with field_mapping-extracted ID and pipeline-added field + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "user1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return Boolean.TRUE.equals(source.get("enriched")) && "alice".equals(source.get("name")); + }); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "user2")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return Boolean.TRUE.equals(source.get("enriched")) && "bob".equals(source.get("name")); + }); + } + + // --- Transformation-specific tests --- + + /** + * RENAME: Rename a field from one name to another. + * Uses built-in "rename" processor. + */ + public void testTransformRename() throws Exception { + createPipeline("rename_pipeline2", "{\"processors\": [{\"rename\": {\"field\": \"name\", \"target_field\": \"full_name\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"age\":25},\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("rename_pipeline2", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("full_name")) && !source.containsKey("name"); + }); + } + + /** + * COPY: Copy a field value to a new field, keeping the original. + * Uses built-in "copy" processor. + */ + public void testTransformCopy() throws Exception { + createPipeline("copy_pipeline", "{\"processors\": [{\"copy\": {\"source_field\": \"name\", \"target_field\": \"name_copy\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"age\":25},\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("copy_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("name")) && "alice".equals(source.get("name_copy")); + }); + } + + /** + * STRING_TO_JSON: Parse a JSON string field into a structured object. + * Uses built-in "json" processor. + */ + public void testTransformStringToJson() throws Exception { + createPipeline("string_to_json_pipeline", "{\"processors\": [{\"json\": {\"field\": \"metadata\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"metadata\":\"{\\\"key\\\":\\\"value\\\",\\\"count\\\":42}\"}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("string_to_json_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + Object metadata = source.get("metadata"); + if (!(metadata instanceof Map)) return false; + @SuppressWarnings("unchecked") + Map metadataMap = (Map) metadata; + return "value".equals(metadataMap.get("key")) && Integer.valueOf(42).equals(metadataMap.get("count")); + }); + } + + /** + * STRING_TO_LONG: Convert a string field to a long value. + * Uses built-in "convert" processor with type "long". + */ + public void testTransformStringToLong() throws Exception { + createPipeline("string_to_long_pipeline", "{\"processors\": [{\"convert\": {\"field\": \"timestamp\", \"type\": \"long\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"timestamp\":\"1739459500000\"},\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("string_to_long_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + Object ts = source.get("timestamp"); + return ts instanceof Number && ((Number) ts).longValue() == 1739459500000L; + }); + } + + /** + * TYPE_CONVERSION: Convert a string field to integer. + * Uses built-in "convert" processor with type "integer". + */ + public void testTransformTypeConversion() throws Exception { + createPipeline("type_conversion_pipeline", "{\"processors\": [{\"convert\": {\"field\": \"age\", \"type\": \"integer\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"age\":\"25\"},\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("type_conversion_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return source.get("age") instanceof Integer && (Integer) source.get("age") == 25; + }); + } + + /** + * GEO: Combine lat/lon fields into a geo_point object. + * Uses built-in "script" processor with Painless. + */ + public void testTransformGeo() throws Exception { + createPipeline( + "geo_pipeline", + "{\"processors\": [{\"script\": {\"source\": " + + "\"ctx.location = ['lat': ctx.lat, 'lon': ctx.lon]; ctx.remove('lat'); ctx.remove('lon')\"}}]}" + ); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"lat\":40.7128,\"lon\":-74.006},\"_op_type\":\"index\"}"; + produceData(payload); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("ingestion_source.error_strategy", "drop") + .put(IndexSettings.FINAL_PIPELINE.getKey(), "geo_pipeline") + .build(), + "{\"properties\":{\"name\":{\"type\":\"text\"},\"location\":{\"type\":\"geo_point\"}}}" + ); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + Object location = source.get("location"); + if (!(location instanceof Map)) return false; + @SuppressWarnings("unchecked") + Map geoPoint = (Map) location; + return geoPoint.containsKey("lat") && geoPoint.containsKey("lon") && !source.containsKey("lat") && !source.containsKey("lon"); + }); + } + + /** + * EXCLUDE_EMPTY_VALUES: Remove fields with null or empty string values. + * Uses built-in "script" processor with Painless. + */ + public void testTransformExcludeEmptyValues() throws Exception { + createPipeline( + "exclude_empty_pipeline", + "{\"processors\": [{\"script\": {\"source\": " + + "\"ctx.entrySet().removeIf(e -> e.getValue() == null " + + "|| (e.getValue() instanceof String && e.getValue().isEmpty()))\"}}]}" + ); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"nickname\":\"\",\"bio\":null,\"age\":25}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("exclude_empty_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("name")) + && !source.containsKey("nickname") + && !source.containsKey("bio") + && Integer.valueOf(25).equals(source.get("age")); + }); + } + + /** + * MAP_KEYS: Rename keys in a nested map object. + * Uses built-in "script" processor with Painless. + */ + public void testTransformMapKeys() throws Exception { + createPipeline( + "map_keys_pipeline", + "{\"processors\": [{\"script\": {\"source\": " + + "\"def map = ctx.props; if (map.containsKey('old_key')) { map.put('new_key', map.remove('old_key')); }\"}}]}" + ); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"props\":{\"old_key\":\"value1\",\"keep_key\":\"value2\"}}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("map_keys_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + @SuppressWarnings("unchecked") + Map props = (Map) source.get("props"); + if (props == null) return false; + return "value1".equals(props.get("new_key")) && !props.containsKey("old_key") && "value2".equals(props.get("keep_key")); + }); + } + + /** + * DROP_FIELD: Remove specific fields from the document. + * Uses built-in "remove" processor. + */ + public void testTransformDropField() throws Exception { + createPipeline("drop_field_pipeline", "{\"processors\": [{\"remove\": {\"field\": [\"internal_id\", \"debug\"]}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"age\":25,\"internal_id\":\"xyz\",\"debug\":true}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("drop_field_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("name")) + && Integer.valueOf(25).equals(source.get("age")) + && !source.containsKey("internal_id") + && !source.containsKey("debug"); + }); + } + + /** + * NESTED_FIELD_EXTRACT: Extract a value from a nested object to a top-level field. + * Uses built-in "script" processor with Painless. + */ + public void testTransformNestedFieldExtract() throws Exception { + createPipeline("nested_extract_pipeline", "{\"processors\": [{\"script\": {\"source\": \"ctx.city = ctx.address.city\"}}]}"); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"address\":{\"city\":\"New York\",\"zip\":\"10001\"}}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("nested_extract_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "New York".equals(source.get("city")) && source.containsKey("address"); + }); + } + + /** + * FLATTEN_MAP: Flatten a nested map into dot-notation top-level fields. + * Uses built-in "script" processor with Painless. + */ + public void testTransformFlattenMap() throws Exception { + createPipeline( + "flatten_pipeline", + "{\"processors\": [{\"script\": {\"source\": \"" + + "def map = ctx.remove('metadata'); " + + "for (entry in map.entrySet()) { " + + " if (entry.getValue() instanceof Map) { " + + " for (inner in entry.getValue().entrySet()) { " + + " ctx['metadata.' + entry.getKey() + '.' + inner.getKey()] = inner.getValue(); " + + " } " + + " } else { " + + " ctx['metadata.' + entry.getKey()] = entry.getValue(); " + + " } " + + "}\"}}]}" + ); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"," + + "\"metadata\":{\"source\":\"kafka\",\"env\":\"prod\",\"nested\":{\"deep\":\"value\"}}}," + + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("flatten_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "kafka".equals(source.get("metadata.source")) + && "prod".equals(source.get("metadata.env")) + && "value".equals(source.get("metadata.nested.deep")) + && !source.containsKey("metadata"); + }); + } + + /** + * Combined transformations: RENAME + DROP_FIELD + ADD_FIELD in a single pipeline. + * Uses built-in "rename", "remove", and "set" processors. + */ + public void testCombinedTransformations() throws Exception { + createPipeline( + "combined_pipeline", + "{\"processors\": [" + + "{\"rename\": {\"field\": \"name\", \"target_field\": \"full_name\"}}," + + "{\"remove\": {\"field\": \"internal_id\"}}," + + "{\"set\": {\"field\": \"processed_by\", \"value\": \"opensearch\"}}" + + "]}" + ); + + String payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\",\"age\":25,\"internal_id\":\"xyz\"}," + "\"_op_type\":\"index\"}"; + produceData(payload); + createIndexWithPipeline("combined_pipeline", 1, 0); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("full_name")) + && !source.containsKey("name") + && !source.containsKey("internal_id") + && "opensearch".equals(source.get("processed_by")) + && Integer.valueOf(25).equals(source.get("age")); + }); + } + + // --- Field Mapping + Pipeline Combined Tests (End-to-End) --- + + /** + * End-to-end: field_mapping extracts _id from raw message, pipeline renames a field. + * Raw Kafka → field_mapping (user_id → _id) → rename pipeline (name → full_name) → Lucene. + */ + public void testFieldMappingWithRenamePipeline() throws Exception { + createPipeline("rename_pipeline", "{\"processors\": [{\"rename\": {\"field\": \"name\", \"target_field\": \"full_name\"}}]}"); + + produceData("{\"user_id\": \"u1\", \"name\": \"alice\", \"age\": 30}"); + produceData("{\"user_id\": \"u2\", \"name\": \"bob\", \"age\": 25}"); + + createFieldMappingIndexWithPipeline("rename_pipeline", "user_id", null, null, null, null); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "u1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("full_name")) && !source.containsKey("name") && !source.containsKey("user_id"); + }); + } + + /** + * End-to-end: field_mapping extracts _id + _version, pipeline adds enrichment field. + * Tests that external versioning from field_mapping works alongside pipeline transforms. + */ + public void testFieldMappingWithVersionAndPipeline() throws Exception { + createPipeline("enrich_pipeline", "{\"processors\": [{\"set\": {\"field\": \"source\", \"value\": \"kafka\"}}]}"); + + // Produce out-of-order versions — v200 first, then v100 + produceData("{\"user_id\": \"abc\", \"name\": \"alice_v2\", \"age\": 31, \"ts\": 200}"); + produceData("{\"user_id\": \"abc\", \"name\": \"alice_v1\", \"age\": 30, \"ts\": 100}"); + + createFieldMappingIndexWithPipeline("enrich_pipeline", "user_id", "ts", null, null, null); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "abc")).get(); + if (response.getHits().getTotalHits().value() != 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + // v2 should win (higher version), pipeline should have added "source" field + return "alice_v2".equals(source.get("name")) + && "kafka".equals(source.get("source")) + && !source.containsKey("user_id") + && !source.containsKey("ts"); + }); + } + + /** + * End-to-end: field_mapping extracts _id + op_type, pipeline transforms source, + * delete operation should NOT run through pipeline. + */ + public void testFieldMappingDeleteWithPipeline() throws Exception { + createPipeline("add_tag_pipeline", "{\"processors\": [{\"set\": {\"field\": \"tag\", \"value\": \"processed\"}}]}"); + + // Index a document, then delete it using field_mapping op_type + produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"is_deleted\": \"false\"}"); + + createFieldMappingIndexWithPipeline("add_tag_pipeline", "user_id", null, "is_deleted", "true", null); + + // Wait for document to be indexed with pipeline tag + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "abc")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "processed".equals(source.get("tag")); + }); + + // Now delete the document + produceData("{\"user_id\": \"abc\", \"name\": \"alice\", \"age\": 30, \"is_deleted\": \"true\"}"); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "abc")).get(); + return response.getHits().getTotalHits().value() == 0; + }); + } + + /** + * End-to-end: field_mapping with all settings (id + version + op_type + create_value) + * combined with a multi-processor pipeline (rename + set + remove). + * Simulates a realistic Uber-style transformation pipeline. + */ + public void testFieldMappingFullConfigWithMultiProcessorPipeline() throws Exception { + createPipeline( + "uber_pipeline", + "{\"processors\": [" + + "{\"rename\": {\"field\": \"name\", \"target_field\": \"full_name\"}}," + + "{\"set\": {\"field\": \"ingestion_source\", \"value\": \"pull_based\"}}," + + "{\"remove\": {\"field\": \"internal_flags\", \"ignore_missing\": true}}" + + "]}" + ); + + // INSERT (create) a document + produceData( + "{\"user_id\": \"u1\", \"name\": \"alice\", \"age\": 30, \"ts\": 100, " + + "\"action\": \"INSERT\", \"internal_flags\": \"debug\"}" + ); + // UPDATE (index) a document + produceData("{\"user_id\": \"u2\", \"name\": \"bob\", \"age\": 25, \"ts\": 200, \"action\": \"UPDATE\"}"); + + createFieldMappingIndexWithPipeline("uber_pipeline", "user_id", "ts", "action", "DELETE", "INSERT"); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + if (response.getHits().getTotalHits().value() != 2) return false; + + Map> docs = new java.util.HashMap<>(); + response.getHits().forEach(hit -> docs.put(hit.getId(), hit.getSourceAsMap())); + + if (!docs.containsKey("u1") || !docs.containsKey("u2")) return false; + + Map alice = docs.get("u1"); + Map bob = docs.get("u2"); + + // Both: renamed name→full_name, added ingestion_source, removed internal_flags + // field_mapping: user_id/ts/action removed from _source + return "alice".equals(alice.get("full_name")) + && !alice.containsKey("name") + && "pull_based".equals(alice.get("ingestion_source")) + && !alice.containsKey("internal_flags") + && !alice.containsKey("user_id") + && !alice.containsKey("ts") + && !alice.containsKey("action") + && "bob".equals(bob.get("full_name")) + && "pull_based".equals(bob.get("ingestion_source")); + }); + } + + /** + * End-to-end: field_mapping + script processor for complex transformation. + * Simulates STRING_TO_JSON + nested field extraction from raw Kafka messages. + */ + public void testFieldMappingWithScriptTransformation() throws Exception { + createPipeline( + "script_pipeline", + "{\"processors\": [{\"script\": {\"source\": " + + "\"ctx.city = ctx.address.city; ctx.zip = ctx.address.zip; ctx.remove('address')\"}}]}" + ); + + produceData("{\"user_id\": \"u1\", \"name\": \"alice\", \"address\": {\"city\": \"New York\", \"zip\": \"10001\"}}"); + + createFieldMappingIndexWithPipeline("script_pipeline", "user_id", null, null, null, null); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "u1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + Map source = response.getHits().getHits()[0].getSourceAsMap(); + return "alice".equals(source.get("name")) + && "New York".equals(source.get("city")) + && "10001".equals(source.get("zip")) + && !source.containsKey("address") + && !source.containsKey("user_id"); + }); + } + + /** + * End-to-end: field_mapping + drop processor. + * Pipeline drops all documents — verifies no documents are indexed even with field_mapping. + */ + public void testFieldMappingWithDropPipeline() throws Exception { + createPipeline("drop_all_pipeline", "{\"processors\": [{\"drop\": {}}]}"); + + produceData("{\"user_id\": \"u1\", \"name\": \"alice\", \"age\": 30}"); + produceData("{\"user_id\": \"u2\", \"name\": \"bob\", \"age\": 25}"); + + createFieldMappingIndexWithPipeline("drop_all_pipeline", "user_id", null, null, null, null); + + // Wait until both messages are processed, then verify none were indexed (all dropped) + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null && stats.getMessageProcessorStats().totalProcessedCount() >= 2; + }); + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).get(); + assertThat(response.getHits().getTotalHits().value(), is(0L)); + } + + /** + * End-to-end: field_mapping + dynamic pipeline update. + * Start with a pipeline that adds "v1", dynamically switch to one that adds "v2". + */ + public void testFieldMappingWithDynamicPipelineSwitch() throws Exception { + createPipeline("v1_pipeline", "{\"processors\": [{\"set\": {\"field\": \"version\", \"value\": \"v1\"}}]}"); + createPipeline("v2_pipeline", "{\"processors\": [{\"set\": {\"field\": \"version\", \"value\": \"v2\"}}]}"); + + produceData("{\"user_id\": \"u1\", \"name\": \"alice\", \"age\": 30}"); + + createFieldMappingIndexWithPipeline("v1_pipeline", "user_id", null, null, null, null); + + // Wait for first document with v1 + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "u1")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return "v1".equals(response.getHits().getHits()[0].getSourceAsMap().get("version")); + }); + + // Switch pipeline + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "v2_pipeline")) + .get() + ); + + produceData("{\"user_id\": \"u2\", \"name\": \"bob\", \"age\": 25}"); + + waitForState(() -> { + refresh(indexName); + SearchResponse response = client().prepareSearch(indexName).setQuery(new TermQueryBuilder("_id", "u2")).get(); + if (response.getHits().getTotalHits().value() < 1) return false; + return "v2".equals(response.getHits().getHits()[0].getSourceAsMap().get("version")); + }); + } + + // --- Helper methods --- + + private void createFieldMappingIndexWithPipeline( + String pipelineId, + String idField, + String versionField, + String opTypeField, + String deleteValue, + String createValue + ) { + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("ingestion_source.error_strategy", "drop") + .put("ingestion_source.mapper_type", "field_mapping") + .put("ingestion_source.mapper_settings.id_field", idField) + .put(IndexSettings.FINAL_PIPELINE.getKey(), pipelineId); + + if (versionField != null) { + settings.put("ingestion_source.mapper_settings.version_field", versionField); + } + if (opTypeField != null) { + settings.put("ingestion_source.mapper_settings.op_type_field", opTypeField); + } + if (deleteValue != null) { + settings.put("ingestion_source.mapper_settings.op_type_field.delete_value", deleteValue); + } + if (createValue != null) { + settings.put("ingestion_source.mapper_settings.op_type_field.create_value", createValue); + } + + createIndex(indexName, settings.build(), mapping); + } + + // --- Helper methods --- + + private void createPipeline(String pipelineId, String pipelineJson) { + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest(pipelineId, new BytesArray(pipelineJson), MediaTypeRegistry.JSON)) + .actionGet(); + } + + private void createIndexWithPipeline(String pipelineId, int numShards, int numReplicas) { + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("ingestion_source.error_strategy", "drop") + .put(IndexSettings.FINAL_PIPELINE.getKey(), pipelineId) + .build(), + mapping + ); + } + +} diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index ebcbd776135cd..62d291c92e02a 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -36,6 +36,7 @@ import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.listener.CompositeTranslogEventListener; import org.opensearch.indices.pollingingest.DefaultStreamPoller; +import org.opensearch.indices.pollingingest.IngestPipelineExecutor; import org.opensearch.indices.pollingingest.IngestionErrorStrategy; import org.opensearch.indices.pollingingest.IngestionSettings; import org.opensearch.indices.pollingingest.PollingIngestStats; @@ -61,17 +62,17 @@ public class IngestionEngine extends InternalEngine { private StreamPoller streamPoller; private final IngestionConsumerFactory ingestionConsumerFactory; private final DocumentMapperForType documentMapperForType; - private final IngestService ingestService; + private final IngestPipelineExecutor pipelineExecutor; private volatile IngestionShardPointer lastCommittedBatchStartPointer; - public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) { - this(engineConfig, ingestionConsumerFactory, null); - } - public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) { super(engineConfig); this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); - this.ingestService = ingestService; + this.pipelineExecutor = new IngestPipelineExecutor( + Objects.requireNonNull(ingestService), + engineConfig.getIndexSettings().getIndex().getName(), + engineConfig.getIndexSettings() + ); this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); registerDynamicIndexSettingsHandlers(); } @@ -155,6 +156,7 @@ private void initializeStreamPoller( .pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis()) .mapperType(ingestionSource.getMapperType()) .mapperSettings(ingestionSource.getMapperSettings()) + .pipelineExecutor(pipelineExecutor) .warmupConfig(ingestionSource.getWarmupConfig()) .build(); registerStreamPollerListener(); diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index 4dc51e0539d83..e1b08061408de 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -117,6 +117,7 @@ private DefaultStreamPoller( long pointerBasedLagUpdateIntervalMs, IngestionMessageMapper.MapperType mapperType, Map mapperSettings, + IngestPipelineExecutor pipelineExecutor, IngestionSource.WarmupConfig warmupConfig ) { this( @@ -124,7 +125,14 @@ private DefaultStreamPoller( consumerFactory, consumerClientId, shardId, - new PartitionedBlockingQueueContainer(numProcessorThreads, shardId, ingestionEngine, errorStrategy, blockingQueueSize), + new PartitionedBlockingQueueContainer( + numProcessorThreads, + shardId, + ingestionEngine, + errorStrategy, + blockingQueueSize, + pipelineExecutor + ), resetState, resetValue, errorStrategy, @@ -732,6 +740,7 @@ public static class Builder { private long pointerBasedLagUpdateIntervalMs = 10000; private IngestionMessageMapper.MapperType mapperType = IngestionMessageMapper.MapperType.DEFAULT; private Map mapperSettings = Collections.emptyMap(); + private IngestPipelineExecutor pipelineExecutor; // Warmup configuration - default matches IndexMetadata settings private IngestionSource.WarmupConfig warmupConfig = new IngestionSource.WarmupConfig(TimeValue.timeValueMillis(-1), 100L); @@ -842,7 +851,14 @@ public Builder mapperSettings(Map mapperSettings) { } /** - * Set warmup enabled + * Set pipeline executor for ingest pipeline execution + */ + public Builder pipelineExecutor(IngestPipelineExecutor pipelineExecutor) { + this.pipelineExecutor = pipelineExecutor; + return this; + } + + /** * Set warmup configuration */ public Builder warmupConfig(IngestionSource.WarmupConfig warmupConfig) { @@ -871,6 +887,7 @@ public DefaultStreamPoller build() { pointerBasedLagUpdateIntervalMs, mapperType, mapperSettings, + pipelineExecutor, warmupConfig ); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java new file mode 100644 index 0000000000000..d225474e5d108 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.common.Nullable; +import org.opensearch.index.IndexSettings; +import org.opensearch.ingest.IngestService; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Handles ingest pipeline resolution and execution for pull-based ingestion. + * + *

Resolves configured pipelines from index settings at initialization and executes them + * synchronously by bridging IngestService's async callback API with CompletableFuture. + * Also registers a dynamic settings listener to pick up runtime changes to {@code final_pipeline}. + * Only {@code final_pipeline} is supported. + * + *

Unlike push-based indexing, pipeline execution in pull-based ingestion does not require the + * node to have the {@code ingest} role. Transformations are executed locally on the node hosting the + * shard, and requests are not forwarded to dedicated ingest nodes. + */ +public class IngestPipelineExecutor { + + private static final Logger logger = LogManager.getLogger(IngestPipelineExecutor.class); + + // TODO: consider making this configurable via index settings if use cases with slow processors arise + static final long PIPELINE_EXECUTION_TIMEOUT_SECONDS = 30; + + private final IngestService ingestService; + private final String index; + private volatile String resolvedFinalPipeline; + + /** + * Creates an IngestPipelineExecutor for the given index. + * Resolves the final pipeline from index settings and registers a dynamic settings listener. + * + * @param ingestService the ingest service for pipeline execution + * @param index the index name + * @param indexSettings the index settings to resolve a pipeline from and register listener on + */ + public IngestPipelineExecutor(IngestService ingestService, String index, IndexSettings indexSettings) { + this.ingestService = Objects.requireNonNull(ingestService); + this.index = Objects.requireNonNull(index); + indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexSettings.FINAL_PIPELINE, this::updateFinalPipeline); + updateFinalPipeline(IndexSettings.FINAL_PIPELINE.get(indexSettings.getSettings())); + } + + /** + * Visible for testing. Creates an executor with a pre-resolved pipeline name, + * bypassing resolution from index settings. + * + * @param ingestService the ingest service + * @param index the index name + * @param finalPipeline the resolved final pipeline name, or null if no pipeline is configured + */ + IngestPipelineExecutor(IngestService ingestService, String index, @Nullable String finalPipeline) { + this.ingestService = Objects.requireNonNull(ingestService); + this.index = Objects.requireNonNull(index); + this.resolvedFinalPipeline = finalPipeline; + } + + /** + * Updates the cached final pipeline name. Called on initial resolution and on dynamic settings change. + */ + void updateFinalPipeline(String finalPipeline) { + if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipeline)) { + resolvedFinalPipeline = null; + } else { + resolvedFinalPipeline = finalPipeline; + } + } + + /** + * Executes final_pipeline on the source map synchronously using CompletableFuture to bridge + * IngestService's async callback API. + * + * @param id document ID + * @param sourceMap source map to transform + * @return the transformed source map, or null if the document was dropped by the pipeline + * @throws Exception if pipeline execution fails + */ + public Map executePipelines(String id, Map sourceMap) throws Exception { + final String finalPipeline = resolvedFinalPipeline; + if (finalPipeline == null) { + return sourceMap; + } + + // Build IndexRequest to carry the document through the pipeline + IndexRequest indexRequest = new IndexRequest(index); + indexRequest.id(id); + indexRequest.source(sourceMap); + + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + indexRequest.setFinalPipeline(finalPipeline); + indexRequest.isPipelineResolved(true); + + final String originalId = id; + final String originalRouting = indexRequest.routing(); + + CompletableFuture future = new CompletableFuture<>(); + AtomicBoolean dropped = new AtomicBoolean(false); + + ingestService.executeBulkRequest( + 1, + Collections.singletonList(indexRequest), + (slot, e) -> future.completeExceptionally(e), + (thread, e) -> { + if (e != null) { + future.completeExceptionally(e); + } else { + future.complete(null); + } + }, + slot -> dropped.set(true), + ThreadPool.Names.WRITE + ); + + // Block until pipeline execution completes (with timeout) + try { + future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Ingest pipeline execution was interrupted", e); + } catch (ExecutionException e) { + throw new RuntimeException("Ingest pipeline execution failed", e.getCause()); + } + + if (dropped.get()) { + return null; + } + + // verify _id and _routing have not been mutated + if (Objects.equals(originalId, indexRequest.id()) == false) { + throw new IllegalStateException( + "Ingest pipeline attempted to change _id from [" + + originalId + + "] to [" + + indexRequest.id() + + "]. _id mutations are not allowed in pull-based ingestion." + ); + } + if (Objects.equals(originalRouting, indexRequest.routing()) == false) { + throw new IllegalStateException( + "Ingest pipeline attempted to change _routing. _routing mutations are not allowed in pull-based ingestion." + ); + } + + // _index change is already blocked by final_pipeline semantics in IngestService + + return indexRequest.sourceAsMap(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java index 60f5ba215ad52..01455c8b653a7 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java @@ -9,7 +9,6 @@ package org.opensearch.indices.pollingingest; import org.opensearch.cluster.metadata.IngestionSource; -import org.opensearch.common.Nullable; import org.opensearch.index.IngestionConsumerFactory; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineConfig; @@ -27,12 +26,11 @@ public class IngestionEngineFactory implements EngineFactory { private final IngestionConsumerFactory ingestionConsumerFactory; - @Nullable private final Supplier ingestServiceSupplier; public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory, Supplier ingestServiceSupplier) { this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); - this.ingestServiceSupplier = ingestServiceSupplier; + this.ingestServiceSupplier = Objects.requireNonNull(ingestServiceSupplier); } /** @@ -45,9 +43,8 @@ public Engine newReadWriteEngine(EngineConfig config) { IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource(); boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled(); - IngestService ingestService = ingestServiceSupplier != null ? ingestServiceSupplier.get() : null; - assert ingestService != null || ingestServiceSupplier == null - : "IngestService supplier returned null. This indicates a initialization ordering issue."; + IngestService ingestService = ingestServiceSupplier.get(); + assert ingestService != null : "IngestService supplier returned null. This indicates a initialization ordering issue."; if (isAllActiveIngestion) { // use ingestion engine on both primary and replica in all-active mode diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index ddbe42c186494..5c4c89d5484fb 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -45,8 +45,8 @@ import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** - * A class to process messages from the ingestion stream. It extracts the payload from the message and creates an - * engine operation. + * A class to process messages from the ingestion stream. It extracts the payload from the message and creates an + * engine operation. */ public class MessageProcessorRunnable implements Runnable, Closeable { public static final String ID = "_id"; @@ -73,18 +73,20 @@ public class MessageProcessorRunnable implements Runnable, Closeable { /** * Constructor. * - * @param blockingQueue the blocking queue to poll messages from - * @param engine the ingestion engine - * @param errorStrategy the error strategy/policy to use + * @param blockingQueue the blocking queue to poll messages from + * @param engine the ingestion engine + * @param errorStrategy the error strategy/policy to use + * @param pipelineExecutor the pipeline executor for ingest pipeline execution */ public MessageProcessorRunnable( BlockingQueue> blockingQueue, IngestionEngine engine, - IngestionErrorStrategy errorStrategy + IngestionErrorStrategy errorStrategy, + IngestPipelineExecutor pipelineExecutor ) { this( blockingQueue, - new MessageProcessor(engine), + new MessageProcessor(engine, pipelineExecutor), errorStrategy, engine.config().getShardId().getIndexName(), engine.config().getShardId().getId() @@ -93,11 +95,12 @@ public MessageProcessorRunnable( /** * Constructor visible for testing. - * @param blockingQueue the blocking queue to poll messages from + * + * @param blockingQueue the blocking queue to poll messages from * @param messageProcessor the message processor - * @param errorStrategy the error strategy/policy to use - * @param indexName the index name - * @param shardId the shard ID + * @param errorStrategy the error strategy/policy to use + * @param indexName the index name + * @param shardId the shard ID */ MessageProcessorRunnable( BlockingQueue> blockingQueue, @@ -116,28 +119,34 @@ public MessageProcessorRunnable( static class MessageProcessor { private final IngestionEngine engine; private final String index; + private final IngestPipelineExecutor pipelineExecutor; - MessageProcessor(IngestionEngine engine) { - this(engine, engine.config().getIndexSettings().getIndex().getName()); + MessageProcessor(IngestionEngine engine, IngestPipelineExecutor pipelineExecutor) { + this.engine = engine; + this.index = engine.config().getIndexSettings().getIndex().getName(); + this.pipelineExecutor = pipelineExecutor; } /** - * visible for testing - * @param engine the ingestion engine - * @param index the index name + * Visible for testing. + * + * @param engine the ingestion engine + * @param index the index name + * @param pipelineExecutor the pipeline executor for ingest pipeline execution */ - MessageProcessor(IngestionEngine engine, String index) { + MessageProcessor(IngestionEngine engine, String index, IngestPipelineExecutor pipelineExecutor) { this.engine = engine; this.index = index; + this.pipelineExecutor = pipelineExecutor; } /** * Visible for testing. Process the message and create an engine operation. - * + *

* Process the message and create an engine operation. It also records the offset in the document as (1) a point * field used for range search, (2) a stored field for retrieval. * - * @param shardUpdateMessage contains the message to process + * @param shardUpdateMessage contains the message to process * @param messageProcessorMetrics message processor metrics */ protected void process(ShardUpdateMessage shardUpdateMessage, MessageProcessorMetrics messageProcessorMetrics) { @@ -169,10 +178,12 @@ protected void process(ShardUpdateMessage shardUpdateMessage, MessageProcessorMe /** * Visible for testing. Get the engine operation from the message. - * @param shardUpdateMessage an update message containing payload and pointer for the update + * + * @param shardUpdateMessage an update message containing payload and pointer for the update * @param messageProcessorMetrics message processor metrics * @return the message operation */ + @SuppressWarnings("unchecked") protected MessageOperation getOperation(ShardUpdateMessage shardUpdateMessage, MessageProcessorMetrics messageProcessorMetrics) throws IOException { Map payloadMap = shardUpdateMessage.parsedPayloadMap(); @@ -226,7 +237,28 @@ protected MessageOperation getOperation(ShardUpdateMessage shardUpdateMessage, M throw new IllegalArgumentException(errorMessage); } - BytesReference source = convertToBytes(payloadMap.get(SOURCE)); + Map sourceMap = (Map) payloadMap.get(SOURCE); + + // Execute ingest pipelines + try { + Map transformedSource = pipelineExecutor.executePipelines(id, sourceMap); + if (transformedSource == null) { + // Document dropped by pipeline + operation = new Engine.NoOp( + 0, + 1, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + "Document dropped by ingest pipeline" + ); + return new MessageOperation(operation, opType); + } + sourceMap = transformedSource; + } catch (Exception e) { + throw new RuntimeException("Ingest pipeline execution failed", e); + } + + BytesReference source = convertToBytes(sourceMap); SourceToParse sourceToParse = new SourceToParse(index, id, source, MediaTypeRegistry.xContentType(source), null); ParsedDocument doc = engine.getDocumentMapperForType().getDocumentMapper().parse(sourceToParse); ParseContext.Document document = doc.rootDoc(); diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java index 1c76d4e1973c3..726298a5324dc 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java @@ -49,7 +49,8 @@ public PartitionedBlockingQueueContainer( int shardId, IngestionEngine ingestionEngine, IngestionErrorStrategy errorStrategy, - int blockingQueueSize + int blockingQueueSize, + IngestPipelineExecutor pipelineExecutor ) { assert numPartitions > 0 : "Number of processor threads / partitions must be greater than 0"; partitionToQueueMap = new ConcurrentHashMap<>(); @@ -76,7 +77,8 @@ public PartitionedBlockingQueueContainer( MessageProcessorRunnable messageProcessorRunnable = new MessageProcessorRunnable( partitionToQueueMap.get(partition), ingestionEngine, - errorStrategy + errorStrategy, + pipelineExecutor ); partitionToMessageProcessorMap.put(partition, messageProcessorRunnable); } diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index 922881f6499b2..1b8e1fb48734f 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -162,7 +162,7 @@ public void testCreationFailure() throws IOException { MapperService mapperService = createMapperService(mapping); engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService); try { - new IngestionEngine(engineConfig, consumerFactory); + new IngestionEngine(engineConfig, consumerFactory, mock(IngestService.class)); fail("Expected EngineException to be thrown"); } catch (EngineException e) { assertEquals("failed to create engine", e.getMessage()); @@ -251,7 +251,7 @@ private IngestionEngine buildIngestionEngine( ); store.associateIndexWithNewTranslog(translogUuid); } - IngestionEngine ingestionEngine = new IngestionEngine(engineConfig, consumerFactory); + IngestionEngine ingestionEngine = new IngestionEngine(engineConfig, consumerFactory, mock(IngestService.class)); ingestionEngine.start(); return ingestionEngine; } @@ -267,33 +267,6 @@ private boolean resultsFound(Engine engine, int numDocs) { } } - public void testConstructorWithNullIngestService() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Store testStore = createStore(indexSettings, newDirectory()); - FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); - - EngineConfig config = config(indexSettings, testStore, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); - String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; - MapperService mapperService = createMapperService(mapping); - config = config(config, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService); - - testStore.createEmpty(config.getIndexSettings().getIndexVersionCreated().luceneVersion); - final String translogUuid = Translog.createEmptyTranslog( - config.getTranslogConfig().getTranslogPath(), - SequenceNumbers.NO_OPS_PERFORMED, - shardId, - primaryTerm.get() - ); - testStore.associateIndexWithNewTranslog(translogUuid); - - // null IngestService — engine should start without pipeline support - IngestionEngine engine = new IngestionEngine(config, consumerFactory, null); - engine.start(); - waitForResults(engine, 2); - engine.close(); - testStore.close(); - } - public void testConstructorWithNonNullIngestService() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); Store testStore = createStore(indexSettings, newDirectory()); diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java new file mode 100644 index 0000000000000..61ea51c74fcc6 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java @@ -0,0 +1,194 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ingest.IngestService; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.IntConsumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class IngestPipelineExecutorTests extends OpenSearchTestCase { + + private IngestService ingestService; + + @Override + public void setUp() throws Exception { + super.setUp(); + ingestService = mock(IngestService.class); + } + + // --- Construction tests --- + + public void testConstructorRequiresNonNullIngestService() { + expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(null, "test_index", (String) null)); + } + + public void testConstructorRequiresNonNullIndex() { + expectThrows(NullPointerException.class, () -> new IngestPipelineExecutor(ingestService, null, (String) null)); + } + + // --- Execution: no pipeline configured --- + + public void testExecutePipelines_NoPipeline_ReturnsSourceUnchanged() throws Exception { + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", (String) null); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + Map result = executor.executePipelines("1", source); + + assertSame(source, result); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } + + // --- Execution: pipeline transforms source --- + + public void testExecutePipelines_TransformsSource() throws Exception { + mockPipelineExecution("added_field", "added_value"); + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "my-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + Map result = executor.executePipelines("1", source); + + assertNotNull(result); + verify(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } + + // --- Execution: pipeline drops document --- + + public void testExecutePipelines_DropsDocument() throws Exception { + doAnswer(invocation -> { + IntConsumer onDropped = invocation.getArgument(4); + BiConsumer onCompletion = invocation.getArgument(3); + onDropped.accept(0); + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "drop-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + Map result = executor.executePipelines("1", source); + + assertNull(result); + } + + // --- Execution: pipeline failure --- + + public void testExecutePipelines_Failure() { + doAnswer(invocation -> { + BiConsumer onFailure = invocation.getArgument(2); + BiConsumer onCompletion = invocation.getArgument(3); + onFailure.accept(0, new RuntimeException("processor failed")); + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "fail-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + Exception e = expectThrows(RuntimeException.class, () -> executor.executePipelines("1", source)); + assertTrue(e.getMessage().contains("Ingest pipeline execution failed")); + assertTrue(e.getCause().getMessage().contains("processor failed")); + } + + public void testExecutePipelines_CompletionException() { + doAnswer(invocation -> { + BiConsumer onCompletion = invocation.getArgument(3); + onCompletion.accept(Thread.currentThread(), new RuntimeException("bulk execution failed")); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "fail-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + RuntimeException e = expectThrows(RuntimeException.class, () -> executor.executePipelines("1", source)); + assertTrue(e.getMessage().contains("Ingest pipeline execution failed")); + } + + // --- Guardrails --- + + public void testExecutePipelines_IdMutation_Throws() { + doAnswer(invocation -> { + Iterable> requests = invocation.getArgument(1); + BiConsumer onCompletion = invocation.getArgument(3); + for (DocWriteRequest req : requests) { + ((IndexRequest) req).id("changed_id"); + } + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "mutate-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> executor.executePipelines("1", source)); + assertTrue(e.getMessage().contains("_id mutations are not allowed")); + } + + public void testExecutePipelines_RoutingMutation_Throws() { + doAnswer(invocation -> { + Iterable> requests = invocation.getArgument(1); + BiConsumer onCompletion = invocation.getArgument(3); + for (DocWriteRequest req : requests) { + ((IndexRequest) req).routing("new_routing"); + } + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + IngestPipelineExecutor executor = new IngestPipelineExecutor(ingestService, "test_index", "mutate-pipeline"); + + Map source = new HashMap<>(); + source.put("name", "alice"); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> executor.executePipelines("1", source)); + assertTrue(e.getMessage().contains("_routing mutations are not allowed")); + } + + // --- Helper --- + + private void mockPipelineExecution(String addedField, Object addedValue) { + doAnswer(invocation -> { + Iterable> requests = invocation.getArgument(1); + BiConsumer onCompletion = invocation.getArgument(3); + for (DocWriteRequest req : requests) { + IndexRequest indexRequest = (IndexRequest) req; + Map sourceMap = indexRequest.sourceAsMap(); + sourceMap.put(addedField, addedValue); + indexRequest.source(sourceMap); + } + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java index be342516716b9..8c4aa4456d40f 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/IngestionEngineFactoryTests.java @@ -27,8 +27,6 @@ public void testConstructorWithIngestServiceSupplier() { public void testConstructorWithNullIngestServiceSupplier() { IngestionConsumerFactory consumerFactory = mock(IngestionConsumerFactory.class); - // Null supplier should be accepted - IngestionEngineFactory factory = new IngestionEngineFactory(consumerFactory, null); - assertNotNull(factory); + expectThrows(NullPointerException.class, () -> new IngestionEngineFactory(consumerFactory, null)); } } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java index ce1fc7830ea34..a12164f071fee 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.pollingingest; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.index.IndexRequest; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; import org.opensearch.index.engine.Engine; @@ -19,6 +20,7 @@ import org.opensearch.index.mapper.ParseContext; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.ingest.IngestService; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -27,12 +29,18 @@ import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.IntConsumer; import org.mockito.ArgumentCaptor; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -52,7 +60,12 @@ public void setUp() throws Exception { documentMapper = mock(DocumentMapper.class); when(documentMapperForType.getDocumentMapper()).thenReturn(documentMapper); - processor = new MessageProcessorRunnable.MessageProcessor(ingestionEngine, "index"); + // No pipeline configured — executor with null finalPipeline (pre-resolved, skips lazy resolution) + processor = new MessageProcessorRunnable.MessageProcessor( + ingestionEngine, + "index", + new IngestPipelineExecutor(mock(IngestService.class), "index", (String) null) + ); } public void testGetIndexOperation() throws IOException { @@ -272,4 +285,191 @@ public void testDropPolicyMessageRetryFail() throws Exception { messageProcessorRunnable.close(); thread.interrupt(); } + + // --- Pipeline execution tests --- + + /** + * Creates a MessageProcessor with a mocked IngestService and index settings that have a final_pipeline configured. + */ + private MessageProcessorRunnable.MessageProcessor createProcessorWithPipeline(IngestService ingestService, String finalPipeline) + throws Exception { + IngestionEngine engine = mock(IngestionEngine.class); + DocumentMapperForType dmft = mock(DocumentMapperForType.class); + DocumentMapper dm = mock(DocumentMapper.class); + when(engine.getDocumentMapperForType()).thenReturn(dmft); + when(dmft.getDocumentMapper()).thenReturn(dm); + + ParsedDocument parsedDoc = mock(ParsedDocument.class); + when(parsedDoc.rootDoc()).thenReturn(new ParseContext.Document()); + when(dm.parse(any())).thenReturn(parsedDoc); + + // Use IngestPipelineExecutor with pre-resolved pipeline name + String resolvedPipeline = "_none".equals(finalPipeline) ? null : finalPipeline; + IngestPipelineExecutor pipelineExecutor = new IngestPipelineExecutor(ingestService, "test_index", resolvedPipeline); + return new MessageProcessorRunnable.MessageProcessor(engine, "test_index", pipelineExecutor); + } + + /** + * Mocks IngestService.executeBulkRequest to simulate successful pipeline execution + * that modifies the document source (adds a field). + */ + private void mockPipelineExecution(IngestService ingestService, String addedField, Object addedValue) { + doAnswer(invocation -> { + Iterable> requests = invocation.getArgument(1); + BiConsumer onCompletion = invocation.getArgument(3); + + // Simulate pipeline adding a field to the source + for (DocWriteRequest req : requests) { + IndexRequest indexRequest = (IndexRequest) req; + java.util.Map sourceMap = indexRequest.sourceAsMap(); + sourceMap.put(addedField, addedValue); + indexRequest.source(sourceMap); + } + + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } + + public void testProcessWithFinalPipeline() throws Exception { + IngestService ingestService = mock(IngestService.class); + mockPipelineExecution(ingestService, "pipeline_processed", true); + + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "test-pipeline"); + + byte[] payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"}}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + MessageProcessorRunnable.MessageOperation operation = proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ); + + assertTrue(operation.engineOperation() instanceof Engine.Index); + + // Verify IngestService was called + verify(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } + + public void testProcessWithNoPipelines() throws Exception { + IngestService ingestService = mock(IngestService.class); + + // final_pipeline = _none → no pipeline configured + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "_none"); + + byte[] payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"}}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + MessageProcessorRunnable.MessageOperation operation = proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ); + + assertTrue(operation.engineOperation() instanceof Engine.Index); + + // IngestService should NOT be called when no pipelines configured + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } + + public void testPipelineDropsDocument() throws Exception { + IngestService ingestService = mock(IngestService.class); + + // Mock pipeline that drops the document + doAnswer(invocation -> { + IntConsumer onDropped = invocation.getArgument(4); + BiConsumer onCompletion = invocation.getArgument(3); + onDropped.accept(0); + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "drop-pipeline"); + + byte[] payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"}}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + MessageProcessorRunnable.MessageOperation operation = proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ); + + // Dropped documents should return NoOp + assertTrue(operation.engineOperation() instanceof Engine.NoOp); + } + + public void testPipelineMutatesId_Throws() throws Exception { + IngestService ingestService = mock(IngestService.class); + + // Mock pipeline that changes _id + doAnswer(invocation -> { + Iterable> requests = invocation.getArgument(1); + BiConsumer onCompletion = invocation.getArgument(3); + for (DocWriteRequest req : requests) { + ((IndexRequest) req).id("changed_id"); + } + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "mutate-id-pipeline"); + + byte[] payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"}}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + RuntimeException e = assertThrows( + RuntimeException.class, + () -> proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ) + ); + assertTrue(e.getCause() instanceof IllegalStateException); + assertTrue(e.getCause().getMessage().contains("_id mutations are not allowed")); + } + + public void testPipelineFailure() throws Exception { + IngestService ingestService = mock(IngestService.class); + + // Mock pipeline that fails + doAnswer(invocation -> { + BiConsumer onFailure = invocation.getArgument(2); + BiConsumer onCompletion = invocation.getArgument(3); + onFailure.accept(0, new RuntimeException("Pipeline processor failed")); + onCompletion.accept(Thread.currentThread(), null); + return null; + }).when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "fail-pipeline"); + + byte[] payload = "{\"_id\":\"1\",\"_source\":{\"name\":\"alice\"}}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + RuntimeException e = assertThrows( + RuntimeException.class, + () -> proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ) + ); + assertTrue(e.getCause().getMessage().contains("Ingest pipeline execution failed")); + } + + public void testPipelineNotCalledForDeleteOperations() throws Exception { + IngestService ingestService = mock(IngestService.class); + + MessageProcessorRunnable.MessageProcessor proc = createProcessorWithPipeline(ingestService, "test-pipeline"); + + byte[] payload = "{\"_id\":\"1\",\"_op_type\":\"delete\"}".getBytes(StandardCharsets.UTF_8); + FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); + + MessageProcessorRunnable.MessageOperation operation = proc.getOperation( + new ShardUpdateMessage(pointer, mock(Message.class), IngestionUtils.getParsedPayloadMap(payload), -1), + MessageProcessorRunnable.MessageProcessorMetrics.create() + ); + + assertTrue(operation.engineOperation() instanceof Engine.Delete); + + // Pipeline should NOT be called for delete operations + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + } }