diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index d95f3eefcc87d..5aadcd825afaf 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -20,6 +20,7 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class RerouteProcessorTests extends ESTestCase { @@ -29,6 +30,7 @@ public void testDefaults() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testEventDataset() throws Exception { @@ -38,6 +40,7 @@ public void testEventDataset() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); + assertThat(ingestDocument.isReroute(), is(true)); assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo")); } @@ -48,6 +51,7 @@ public void testEventDatasetDottedFieldName() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); + assertThat(ingestDocument.isReroute(), is(true)); assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo")); assertFalse(ingestDocument.getCtxMap().containsKey("event")); } @@ -59,6 +63,7 @@ public void testNoDataset() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of("{{ds}}"), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "default"); + assertThat(ingestDocument.isReroute(), is(true)); assertFalse(ingestDocument.hasField("event.dataset")); } @@ -70,6 +75,7 @@ public void testSkipFirstProcessor() throws Exception { CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "executed", "default"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testSkipLastProcessor() throws Exception { @@ -80,6 +86,7 @@ public void testSkipLastProcessor() throws Exception { CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "executed", "default"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testDataStreamFieldsFromDocument() throws Exception { @@ -90,6 +97,7 @@ public void testDataStreamFieldsFromDocument() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "bar"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception { @@ -101,6 +109,7 @@ public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo", "bar"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testInvalidDataStreamFieldsFromDocument() throws Exception { @@ -111,6 +120,7 @@ public void testInvalidDataStreamFieldsFromDocument() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testDestination() throws Exception { @@ -120,6 +130,7 @@ public void testDestination() throws Exception { processor.execute(ingestDocument); assertFalse(ingestDocument.hasField("data_stream")); assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("foo")); + assertThat(ingestDocument.isReroute(), is(true)); } public void testFieldReference() throws Exception { @@ -130,6 +141,7 @@ public void testFieldReference() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}")); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testRerouteToCurrentTarget() throws Exception { @@ -142,6 +154,7 @@ public void testRerouteToCurrentTarget() throws Exception { ); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); + assertThat(ingestDocument.isReroute(), is(true)); assertFalse(ingestDocument.hasField("pipeline_is_continued")); } @@ -156,6 +169,7 @@ public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Except processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("logs-generic-default")); assertDataSetFields(ingestDocument, "logs", "generic", "default"); + assertThat(ingestDocument.isReroute(), is(true)); assertFalse(ingestDocument.hasField("pipeline_is_continued")); } @@ -170,6 +184,7 @@ public void testDataStreamFieldReference() throws Exception { ); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "dataset_from_doc", "namespace_from_doc"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testDatasetFieldReferenceMissingValue() throws Exception { @@ -181,6 +196,7 @@ public void testDatasetFieldReferenceMissingValue() throws Exception { ); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "fallback", "fallback"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testDatasetFieldReference() throws Exception { @@ -194,6 +210,7 @@ public void testDatasetFieldReference() throws Exception { ); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testFallbackToValuesFrom_index() throws Exception { @@ -204,6 +221,7 @@ public void testFallbackToValuesFrom_index() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}")); processor.execute(ingestDocument); assertDataSetFields(ingestDocument, "logs", "generic", "default"); + assertThat(ingestDocument.isReroute(), is(true)); } public void testInvalidDataStreamName() throws Exception { @@ -212,6 +230,7 @@ public void testInvalidDataStreamName() throws Exception { RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme --")); + assertThat(ingestDocument.isReroute(), is(false)); } { @@ -220,6 +239,7 @@ public void testInvalidDataStreamName() throws Exception { RerouteProcessor processor = createRerouteProcessor("bar"); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar")); + assertThat(ingestDocument.isReroute(), is(true)); } } @@ -229,6 +249,7 @@ public void testRouteOnNonStringFieldFails() { RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); + assertThat(ingestDocument.isReroute(), is(false)); } public void testDatasetSanitization() { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 676a347c5890c..106c3499e4108 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -932,7 +932,7 @@ public void reroute(String destIndex) { * * @return whether the document is redirected to another target */ - boolean isReroute() { + public boolean isReroute() { return reroute; } @@ -940,7 +940,7 @@ boolean isReroute() { * Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless * {@link #reroute(String)} is called. */ - void resetReroute() { + public void resetReroute() { reroute = false; }