diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 33813e92c7f63..e3a7a96fadc56 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,6 +62,19 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + + /** + * An ordered set of the values of the _index that have been used for this document. + *

+ * IMPORTANT: This is only updated after a top-level pipeline has run (see {@code IngestService#executePipelines(...)}). + *

+ * For example, if a processor changes the _index for a document from 'foo' to 'bar', + * and then another processor changes the value back to 'foo', then the overall effect + * of the pipeline was that the _index value did not change and so only 'foo' would appear + * in the index history. + */ + private Set indexHistory = new LinkedHashSet<>(); + private boolean doNoSelfReferencesCheck = false; private boolean reroute = false; @@ -70,21 +83,27 @@ public IngestDocument(String index, String id, long version, String routing, Ver this.ingestMetadata = new HashMap<>(); this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow()); this.templateModel = initializeTemplateModel(); + + // initialize the index history by putting the current index into it + this.indexHistory.add(index); } + // note: these rest of these constructors deal with the data-centric view of the IngestDocument, not the execution-centric view. + // For example, the copy constructor doesn't populate the `executedPipelines` or `indexHistory` (as well as some other fields), + // because those fields are execution-centric. + /** - * Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument + * Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided. */ public IngestDocument(IngestDocument other) { this( new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); - this.reroute = other.reroute; } /** - * Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied. + * Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied. */ public IngestDocument(Map sourceAndMetadata, Map ingestMetadata) { Map source; @@ -107,7 +126,7 @@ public IngestDocument(Map sourceAndMetadata, Map } /** - * Constructor to create an IngestDocument from its constituent maps + * Constructor to create an IngestDocument from its constituent maps. */ IngestDocument(IngestCtxMap ctxMap, Map ingestMetadata) { this.ctxMap = Objects.requireNonNull(ctxMap); @@ -841,6 +860,24 @@ List getPipelineStack() { return pipelineStack; } + /** + * Adds an index to the index history for this document, returning true if the index + * was added to the index history (i.e. if it wasn't already in the index history). + * + * @param index the index to potentially add to the index history + * @return true if the index history did not already contain the index in question + */ + public boolean updateIndexHistory(String index) { + return indexHistory.add(index); + } + + /** + * @return an unmodifiable view of the document's index history + */ + public Set getIndexHistory() { + return Collections.unmodifiableSet(indexHistory); + } + /** * @return Whether a self referencing check should be performed */ @@ -990,7 +1027,7 @@ static ResolveResult error(String errorMessage) { /** * Provides a shallowly read-only, very limited, map-like view of two maps. The only methods that are implemented are * {@link Map#get(Object)} and {@link Map#containsKey(Object)}, everything else throws UnsupportedOperationException. - * + *

* The overrides map has higher priority than the primary map -- values in that map under some key will take priority over values * in the primary map under the same key. * diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 7534ac64a02ca..91f97ebab3e0c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -69,7 +69,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -688,9 +687,7 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); - indexRecursionDetection.add(indexRequest.index()); - executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener); i++; } } @@ -774,8 +771,7 @@ private void executePipelines( final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener, - final Set indexRecursionDetection + final ActionListener listener ) { assert pipelines.hasNext(); PipelineSlot slot = pipelines.next(); @@ -859,17 +855,18 @@ private void executePipelines( return; // document failed! } - // check for cycles in the visited indices - if (indexRecursionDetection.add(newIndex) == false) { - List indexRoute = new ArrayList<>(indexRecursionDetection); - indexRoute.add(newIndex); + // add the index to the document's index history, and check for cycles in the visited indices + boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; + if (cycle) { + List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); + indexCycle.add(newIndex); listener.onFailure( new IllegalStateException( format( "index cycle detected while processing pipeline [%s] for document [%s]: %s", pipelineId, indexRequest.id(), - indexRoute + indexCycle ) ) ); @@ -890,7 +887,7 @@ private void executePipelines( } if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); + executePipelines(newPipelines, indexRequest, ingestDocument, listener); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 2950da03c77c3..38c984c3de933 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; import org.junit.Before; import java.time.Instant; @@ -1142,4 +1143,19 @@ public void testIsMetadata() { assertFalse(IngestDocument.Metadata.isMetadata("address")); } + public void testIndexHistory() { + // the index history contains the original index + String index1 = ingestDocument.getFieldValue("_index", String.class); + assertThat(index1, equalTo("index")); + assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1)); + + // it can be updated to include another index + String index2 = "another_index"; + assertTrue(ingestDocument.updateIndexHistory(index2)); + assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2)); + + // an index cycle cannot be introduced, however + assertFalse(ingestDocument.updateIndexHistory(index1)); + assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2)); + } }