Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -73,6 +75,13 @@ public final class IngestDocument {
// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

/**
* Maintains the stack of access patterns for each pipeline that this document is currently being processed by.
* When a pipeline with one access pattern calls another pipeline with a different one, we must ensure the access patterns
* are correctly restored when returning from a nested pipeline to an enclosing pipeline.
*/
private final Deque<IngestPipelineFieldAccessPattern> accessPatternStack = new ArrayDeque<>();

/**
* An ordered set of the values of the _index that have been used for this document.
* <p>
Expand Down Expand Up @@ -114,12 +123,13 @@ public IngestDocument(IngestDocument other) {
deepCopyMap(other.ingestMetadata)
);
/*
* The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're
* copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those
* involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more
* complicated, we're just copying this over here since it does no harm.
* The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric.
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from
* two non-test locations, and both of those involve the simulate pipeline logic. The simulate pipeline logic needs this
* information. Rather than making the code more complicated, we're just copying them over here since it does no harm.
*/
this.executedPipelines.addAll(other.executedPipelines);
this.accessPatternStack.addAll(other.accessPatternStack);
}

/**
Expand Down Expand Up @@ -856,8 +866,17 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
);
} else if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
IngestPipelineFieldAccessPattern previousAccessPattern = accessPatternStack.peek();
accessPatternStack.push(pipeline.getFieldAccessPattern());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
accessPatternStack.poll();
assert previousAccessPattern == accessPatternStack.peek()
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected ["
+ previousAccessPattern
+ "] but found ["
+ accessPatternStack.peek()
+ "]";
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
Expand All @@ -879,6 +898,13 @@ List<String> getPipelineStack() {
return pipelineStack;
}

/**
* @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc
*/
public IngestPipelineFieldAccessPattern getCurrentAccessPattern() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this method isn't used above (and seems to only be used in the test)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used above yet. I'm currently using it in a couple places in the next set of changes I'm working on.

return accessPatternStack.peek();
}

/**
* Adds an index to the index history for this document, returning true if the index
* was added to the index history (i.e. if it wasn't already in the index history).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
*/
public final class Pipeline {
public class Pipeline {

public static final String DESCRIPTION_KEY = "description";
public static final String PROCESSORS_KEY = "processors";
Expand Down
100 changes: 100 additions & 0 deletions server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -25,12 +28,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.DoubleStream;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -40,6 +45,10 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IngestDocumentTests extends ESTestCase {

Expand Down Expand Up @@ -1245,4 +1254,95 @@ public void testSourceHashMapIsNotCopied() {
assertThat(document2.getCtxMap().getMetadata(), not(sameInstance(document1.getCtxMap().getMetadata())));
}
}

/**
* When executing nested pipelines on an ingest document, the document should keep track of each pipeline's access pattern for the
* lifetime of each pipeline execution. When a pipeline execution concludes, it should clear access pattern from the document and
* restore the previous pipeline's access pattern.
*/
public void testNestedAccessPatternPropagation() {
Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG);

Map<String, Object> source = new HashMap<>(Map.of("foo", 1));
IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);

// 1-3 nested calls
doTestNestedAccessPatternPropagation(0, randomIntBetween(1, 5), document);

// At the end of the test, there should be neither pipeline ids nor access patterns left in the stack.
assertThat(document.getPipelineStack(), is(empty()));
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
}

/**
* Recursively execute some number of pipelines at various call depths to simulate a robust chain of pipelines being called on a
* document.
* @param level The current call depth. This is how many pipelines deep into the nesting we are.
* @param maxCallDepth How much further in the call depth we should go in the test. If this is greater than the current level, we will
* recurse in at least one of the pipelines executed at this level. If the current level is equal to the max call
* depth we will run some pipelines but recurse no further before returning.
* @param document The document to repeatedly use and verify against.
*/
void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDocument document) {
// 1-5 pipelines to be run at any given level
logger.debug("LEVEL {}/{}: BEGIN", level, maxCallDepth);
int pipelinesAtThisLevel = randomIntBetween(1, 7);
logger.debug("Run pipelines: {}", pipelinesAtThisLevel);

boolean recursed = false;
if (level >= maxCallDepth) {
// If we're at max call depth, do no recursions
recursed = true;
logger.debug("No more recursions");
}

for (int pipelineIdx = 0; pipelineIdx < pipelinesAtThisLevel; pipelineIdx++) {
String expectedPipelineId = randomAlphaOfLength(20);
IngestPipelineFieldAccessPattern expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());

// We mock the pipeline because it's easier to verify calls and doesn't
// need us to force a stall in the execution logic to half apply it.
Pipeline mockPipeline = mock(Pipeline.class);
when(mockPipeline.getId()).thenReturn(expectedPipelineId);
when(mockPipeline.getProcessors()).thenReturn(List.of(new TestProcessor((doc) -> {})));
when(mockPipeline.getFieldAccessPattern()).thenReturn(expectedAccessPattern);
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> mockHandler = mock(BiConsumer.class);

// Execute pipeline
logger.debug("LEVEL {}/{}: Executing {}/{}", level, maxCallDepth, pipelineIdx, pipelinesAtThisLevel);
document.executePipeline(mockPipeline, mockHandler);

// Verify pipeline was called, capture completion handler
ArgumentCaptor<BiConsumer<IngestDocument, Exception>> argumentCaptor = ArgumentCaptor.captor();
verify(mockPipeline).execute(eq(document), argumentCaptor.capture());

// Assert expected state
assertThat(document.getPipelineStack().getFirst(), is(expectedPipelineId));
assertThat(document.getCurrentAccessPattern(), is(expectedAccessPattern));

// Randomly recurse: We recurse only one time per level to avoid hogging test time, but we randomize which
// pipeline to recurse on, eventually requiring a recursion on the last pipeline run if one hasn't happened yet.
if (recursed == false && (randomBoolean() || pipelineIdx == pipelinesAtThisLevel - 1)) {
logger.debug("Recursed on pipeline {}", pipelineIdx);
doTestNestedAccessPatternPropagation(level + 1, maxCallDepth, document);
recursed = true;
}

// Pull up the captured completion handler to conclude the pipeline run
argumentCaptor.getValue().accept(document, null);

// Assert expected state
assertThat(document.getPipelineStack().size(), is(equalTo(level)));
if (level == 0) {
// Top level means access pattern should be empty
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
} else {
// If we're nested below the top level we should still have an access
// pattern on the document for the pipeline above us
assertThat(document.getCurrentAccessPattern(), is(not(nullValue())));
}
}
logger.debug("LEVEL {}/{}: COMPLETE", level, maxCallDepth);
}
}