Skip to content

ingest pipeline reroute: allow external inspection/reset of state #96934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class RerouteProcessorTests extends ESTestCase {

Expand All @@ -29,6 +30,7 @@ public void testDefaults() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testEventDataset() throws Exception {
Expand All @@ -38,6 +40,7 @@ public void testEventDataset() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
}

Expand All @@ -48,6 +51,7 @@ public void testEventDatasetDottedFieldName() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo"));
assertFalse(ingestDocument.getCtxMap().containsKey("event"));
}
Expand All @@ -59,6 +63,7 @@ public void testNoDataset() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{ds}}"), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("event.dataset"));
}

Expand All @@ -70,6 +75,7 @@ public void testSkipFirstProcessor() throws Exception {
CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testSkipLastProcessor() throws Exception {
Expand All @@ -80,6 +86,7 @@ public void testSkipLastProcessor() throws Exception {
CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "executed", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDataStreamFieldsFromDocument() throws Exception {
Expand All @@ -90,6 +97,7 @@ public void testDataStreamFieldsFromDocument() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
Expand All @@ -101,6 +109,7 @@ public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testInvalidDataStreamFieldsFromDocument() throws Exception {
Expand All @@ -111,6 +120,7 @@ public void testInvalidDataStreamFieldsFromDocument() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDestination() throws Exception {
Expand All @@ -120,6 +130,7 @@ public void testDestination() throws Exception {
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField("data_stream"));
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("foo"));
assertThat(ingestDocument.isReroute(), is(true));
}

public void testFieldReference() throws Exception {
Expand All @@ -130,6 +141,7 @@ public void testFieldReference() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{service.name}}"), List.of("{{service.environment}}"));
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "opbeans_java", "dev");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testRerouteToCurrentTarget() throws Exception {
Expand All @@ -142,6 +154,7 @@ public void testRerouteToCurrentTarget() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("pipeline_is_continued"));
}

Expand All @@ -156,6 +169,7 @@ public void testFieldReferenceWithMissingReroutesToCurrentTarget() throws Except
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("logs-generic-default"));
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
assertFalse(ingestDocument.hasField("pipeline_is_continued"));
}

Expand All @@ -170,6 +184,7 @@ public void testDataStreamFieldReference() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "dataset_from_doc", "namespace_from_doc");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDatasetFieldReferenceMissingValue() throws Exception {
Expand All @@ -181,6 +196,7 @@ public void testDatasetFieldReferenceMissingValue() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "fallback", "fallback");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testDatasetFieldReference() throws Exception {
Expand All @@ -194,6 +210,7 @@ public void testDatasetFieldReference() throws Exception {
);
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testFallbackToValuesFrom_index() throws Exception {
Expand All @@ -204,6 +221,7 @@ public void testFallbackToValuesFrom_index() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of("{{foo}}"), List.of("{{bar}}"));
processor.execute(ingestDocument);
assertDataSetFields(ingestDocument, "logs", "generic", "default");
assertThat(ingestDocument.isReroute(), is(true));
}

public void testInvalidDataStreamName() throws Exception {
Expand All @@ -212,6 +230,7 @@ public void testInvalidDataStreamName() throws Exception {
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme <type>-<dataset>-<namespace>"));
assertThat(ingestDocument.isReroute(), is(false));
}

{
Expand All @@ -220,6 +239,7 @@ public void testInvalidDataStreamName() throws Exception {
RerouteProcessor processor = createRerouteProcessor("bar");
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar"));
assertThat(ingestDocument.isReroute(), is(true));
}
}

Expand All @@ -229,6 +249,7 @@ public void testRouteOnNonStringFieldFails() {
RerouteProcessor processor = createRerouteProcessor(List.of("{{numeric_field}}"), List.of());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("field [numeric_field] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
assertThat(ingestDocument.isReroute(), is(false));
}

public void testDatasetSanitization() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,15 +932,15 @@ public void reroute(String destIndex) {
*
* @return whether the document is redirected to another target
*/
boolean isReroute() {
public boolean isReroute() {
return reroute;
}

/**
* Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless
* {@link #reroute(String)} is called.
*/
void resetReroute() {
public void resetReroute() {
reroute = false;
}

Expand Down