Skip to content

Commit 96c1bc0

Browse files
committed
Propagate the ingest pipeline access pattern flag to the ingest document when executing
1 parent 2144bae commit 96c1bc0

File tree

3 files changed

+131
-5
lines changed

3 files changed

+131
-5
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727

2828
import java.time.ZoneOffset;
2929
import java.time.ZonedDateTime;
30+
import java.util.ArrayDeque;
3031
import java.util.ArrayList;
3132
import java.util.Arrays;
3233
import java.util.Base64;
3334
import java.util.Collection;
3435
import java.util.Collections;
3536
import java.util.Date;
37+
import java.util.Deque;
3638
import java.util.HashMap;
3739
import java.util.LinkedHashSet;
3840
import java.util.List;
@@ -73,6 +75,13 @@ public final class IngestDocument {
7375
// Contains all pipelines that have been executed for this document
7476
private final Set<String> executedPipelines = new LinkedHashSet<>();
7577

78+
/**
79+
* Maintains the stack of access patterns for each pipeline that this document is currently being processed by.
80+
* When a pipeline with one access pattern calls another pipeline with a different one, we must ensure the access patterns
81+
* are correctly restored when returning from a nested pipeline to an enclosing pipeline.
82+
*/
83+
private final Deque<IngestPipelineFieldAccessPattern> accessPatternStack = new ArrayDeque<>();
84+
7685
/**
7786
* An ordered set of the values of the _index that have been used for this document.
7887
* <p>
@@ -114,12 +123,13 @@ public IngestDocument(IngestDocument other) {
114123
deepCopyMap(other.ingestMetadata)
115124
);
116125
/*
117-
* The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're
118-
* copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those
119-
* involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more
120-
* complicated, we're just copying this over here since it does no harm.
126+
* The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric.
127+
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from
128+
* two non-test locations, and both of those involve the simulate pipeline logic. The simulate pipeline logic needs this
129+
* information. Rather than making the code more complicated, we're just copying them over here since it does no harm.
121130
*/
122131
this.executedPipelines.addAll(other.executedPipelines);
132+
this.accessPatternStack.addAll(other.accessPatternStack);
123133
}
124134

125135
/**
@@ -856,8 +866,17 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
856866
);
857867
} else if (executedPipelines.add(pipeline.getId())) {
858868
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
869+
IngestPipelineFieldAccessPattern previousAccessPattern = accessPatternStack.peek();
870+
accessPatternStack.push(pipeline.getFieldAccessPattern());
859871
pipeline.execute(this, (result, e) -> {
860872
executedPipelines.remove(pipeline.getId());
873+
accessPatternStack.poll();
874+
assert previousAccessPattern == accessPatternStack.peek()
875+
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected ["
876+
+ previousAccessPattern
877+
+ "] but found ["
878+
+ accessPatternStack.peek()
879+
+ "]";
861880
if (previousPipeline != null) {
862881
ingestMetadata.put("pipeline", previousPipeline);
863882
} else {
@@ -879,6 +898,13 @@ List<String> getPipelineStack() {
879898
return pipelineStack;
880899
}
881900

901+
/**
902+
* @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc
903+
*/
904+
public IngestPipelineFieldAccessPattern getCurrentAccessPattern() {
905+
return accessPatternStack.peek();
906+
}
907+
882908
/**
883909
* Adds an index to the index history for this document, returning true if the index
884910
* was added to the index history (i.e. if it wasn't already in the index history).

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
2727
*/
28-
public final class Pipeline {
28+
public class Pipeline {
2929

3030
public static final String DESCRIPTION_KEY = "description";
3131
public static final String PROCESSORS_KEY = "processors";

server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99

1010
package org.elasticsearch.ingest;
1111

12+
import org.elasticsearch.cluster.metadata.DataStream;
1213
import org.elasticsearch.common.bytes.BytesArray;
1314
import org.elasticsearch.common.xcontent.XContentHelper;
1415
import org.elasticsearch.test.ESTestCase;
1516
import org.elasticsearch.xcontent.XContentType;
1617
import org.hamcrest.Matchers;
18+
import org.junit.Assume;
1719
import org.junit.Before;
20+
import org.mockito.ArgumentCaptor;
1821

1922
import java.time.Instant;
2023
import java.time.ZoneId;
@@ -25,12 +28,14 @@
2528
import java.util.List;
2629
import java.util.Map;
2730
import java.util.Set;
31+
import java.util.function.BiConsumer;
2832
import java.util.stream.DoubleStream;
2933

3034
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
3135
import static org.hamcrest.Matchers.both;
3236
import static org.hamcrest.Matchers.containsInAnyOrder;
3337
import static org.hamcrest.Matchers.containsString;
38+
import static org.hamcrest.Matchers.empty;
3439
import static org.hamcrest.Matchers.equalTo;
3540
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3641
import static org.hamcrest.Matchers.instanceOf;
@@ -40,6 +45,10 @@
4045
import static org.hamcrest.Matchers.notNullValue;
4146
import static org.hamcrest.Matchers.nullValue;
4247
import static org.hamcrest.Matchers.sameInstance;
48+
import static org.mockito.ArgumentMatchers.eq;
49+
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.when;
4352

4453
public class IngestDocumentTests extends ESTestCase {
4554

@@ -1245,4 +1254,95 @@ public void testSourceHashMapIsNotCopied() {
12451254
assertThat(document2.getCtxMap().getMetadata(), not(sameInstance(document1.getCtxMap().getMetadata())));
12461255
}
12471256
}
1257+
1258+
/**
1259+
* When executing nested pipelines on an ingest document, the document should keep track of each pipeline's access pattern for the
1260+
* lifetime of each pipeline execution. When a pipeline execution concludes, it should clear access pattern from the document and
1261+
* restore the previous pipeline's access pattern.
1262+
*/
1263+
public void testNestedAccessPatternPropagation() {
1264+
Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG);
1265+
1266+
Map<String, Object> source = new HashMap<>(Map.of("foo", 1));
1267+
IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);
1268+
1269+
// 1-3 nested calls
1270+
doTestNestedAccessPatternPropagation(0, randomIntBetween(1, 5), document);
1271+
1272+
// At the end of the test, there should be neither pipeline ids nor access patterns left in the stack.
1273+
assertThat(document.getPipelineStack(), is(empty()));
1274+
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
1275+
}
1276+
1277+
/**
1278+
* Recursively execute some number of pipelines at various call depths to simulate a robust chain of pipelines being called on a
1279+
* document.
1280+
* @param level The current call depth. This is how many pipelines deep into the nesting we are.
1281+
* @param maxCallDepth How much further in the call depth we should go in the test. If this is greater than the current level, we will
1282+
* recurse in at least one of the pipelines executed at this level. If the current level is equal to the max call
1283+
* depth we will run some pipelines but recurse no further before returning.
1284+
* @param document The document to repeatedly use and verify against.
1285+
*/
1286+
void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDocument document) {
1287+
// 1-5 pipelines to be run at any given level
1288+
logger.debug("LEVEL {}/{}: BEGIN", level, maxCallDepth);
1289+
int pipelinesAtThisLevel = randomIntBetween(1, 7);
1290+
logger.debug("Run pipelines: {}", pipelinesAtThisLevel);
1291+
1292+
boolean recursed = false;
1293+
if (level >= maxCallDepth) {
1294+
// If we're at max call depth, do no recursions
1295+
recursed = true;
1296+
logger.debug("No more recursions");
1297+
}
1298+
1299+
for (int pipelineIdx = 0; pipelineIdx < pipelinesAtThisLevel; pipelineIdx++) {
1300+
String expectedPipelineId = randomAlphaOfLength(20);
1301+
IngestPipelineFieldAccessPattern expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
1302+
1303+
// We mock the pipeline because it's easier to verify calls and doesn't
1304+
// need us to force a stall in the execution logic to half apply it.
1305+
Pipeline mockPipeline = mock(Pipeline.class);
1306+
when(mockPipeline.getId()).thenReturn(expectedPipelineId);
1307+
when(mockPipeline.getProcessors()).thenReturn(List.of(new TestProcessor((doc) -> {})));
1308+
when(mockPipeline.getFieldAccessPattern()).thenReturn(expectedAccessPattern);
1309+
@SuppressWarnings("unchecked")
1310+
BiConsumer<IngestDocument, Exception> mockHandler = mock(BiConsumer.class);
1311+
1312+
// Execute pipeline
1313+
logger.debug("LEVEL {}/{}: Executing {}/{}", level, maxCallDepth, pipelineIdx, pipelinesAtThisLevel);
1314+
document.executePipeline(mockPipeline, mockHandler);
1315+
1316+
// Verify pipeline was called, capture completion handler
1317+
ArgumentCaptor<BiConsumer<IngestDocument, Exception>> argumentCaptor = ArgumentCaptor.captor();
1318+
verify(mockPipeline).execute(eq(document), argumentCaptor.capture());
1319+
1320+
// Assert expected state
1321+
assertThat(document.getPipelineStack().getFirst(), is(expectedPipelineId));
1322+
assertThat(document.getCurrentAccessPattern(), is(expectedAccessPattern));
1323+
1324+
// Randomly recurse: We recurse only one time per level to avoid hogging test time, but we randomize which
1325+
// pipeline to recurse on, eventually requiring a recursion on the last pipeline run if one hasn't happened yet.
1326+
if (recursed == false && (randomBoolean() || pipelineIdx == pipelinesAtThisLevel - 1)) {
1327+
logger.debug("Recursed on pipeline {}", pipelineIdx);
1328+
doTestNestedAccessPatternPropagation(level + 1, maxCallDepth, document);
1329+
recursed = true;
1330+
}
1331+
1332+
// Pull up the captured completion handler to conclude the pipeline run
1333+
argumentCaptor.getValue().accept(document, null);
1334+
1335+
// Assert expected state
1336+
assertThat(document.getPipelineStack().size(), is(equalTo(level)));
1337+
if (level == 0) {
1338+
// Top level means access pattern should be empty
1339+
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
1340+
} else {
1341+
// If we're nested below the top level we should still have an access
1342+
// pattern on the document for the pipeline above us
1343+
assertThat(document.getCurrentAccessPattern(), is(not(nullValue())));
1344+
}
1345+
}
1346+
logger.debug("LEVEL {}/{}: COMPLETE", level, maxCallDepth);
1347+
}
12481348
}

0 commit comments

Comments
 (0)