Skip to content

Commit 89770c5

Browse files
authored
Refactor IngestDocument reroute recursion detection (#95350)
1 parent 094fd69 commit 89770c5

File tree

3 files changed

+67
-17
lines changed

3 files changed

+67
-17
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ public final class IngestDocument {
6262

6363
// Contains all pipelines that have been executed for this document
6464
private final Set<String> executedPipelines = new LinkedHashSet<>();
65+
66+
/**
67+
* An ordered set of the values of the _index that have been used for this document.
68+
* <p>
69+
* IMPORTANT: This is only updated after a top-level pipeline has run (see {@code IngestService#executePipelines(...)}).
70+
* <p>
71+
* For example, if a processor changes the _index for a document from 'foo' to 'bar',
72+
* and then another processor changes the value back to 'foo', then the overall effect
73+
* of the pipeline was that the _index value did not change and so only 'foo' would appear
74+
* in the index history.
75+
*/
76+
private Set<String> indexHistory = new LinkedHashSet<>();
77+
6578
private boolean doNoSelfReferencesCheck = false;
6679
private boolean reroute = false;
6780

@@ -70,21 +83,27 @@ public IngestDocument(String index, String id, long version, String routing, Ver
7083
this.ingestMetadata = new HashMap<>();
7184
this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow());
7285
this.templateModel = initializeTemplateModel();
86+
87+
// initialize the index history by putting the current index into it
88+
this.indexHistory.add(index);
7389
}
7490

91+
// note: these rest of these constructors deal with the data-centric view of the IngestDocument, not the execution-centric view.
92+
// For example, the copy constructor doesn't populate the `executedPipelines` or `indexHistory` (as well as some other fields),
93+
// because those fields are execution-centric.
94+
7595
/**
76-
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
96+
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
7797
*/
7898
public IngestDocument(IngestDocument other) {
7999
this(
80100
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
81101
deepCopyMap(other.ingestMetadata)
82102
);
83-
this.reroute = other.reroute;
84103
}
85104

86105
/**
87-
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
106+
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
88107
*/
89108
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
90109
Map<String, Object> source;
@@ -107,7 +126,7 @@ public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object>
107126
}
108127

109128
/**
110-
* Constructor to create an IngestDocument from its constituent maps
129+
* Constructor to create an IngestDocument from its constituent maps.
111130
*/
112131
IngestDocument(IngestCtxMap ctxMap, Map<String, Object> ingestMetadata) {
113132
this.ctxMap = Objects.requireNonNull(ctxMap);
@@ -841,6 +860,24 @@ List<String> getPipelineStack() {
841860
return pipelineStack;
842861
}
843862

863+
/**
864+
* Adds an index to the index history for this document, returning true if the index
865+
* was added to the index history (i.e. if it wasn't already in the index history).
866+
*
867+
* @param index the index to potentially add to the index history
868+
* @return true if the index history did not already contain the index in question
869+
*/
870+
public boolean updateIndexHistory(String index) {
871+
return indexHistory.add(index);
872+
}
873+
874+
/**
875+
* @return an unmodifiable view of the document's index history
876+
*/
877+
public Set<String> getIndexHistory() {
878+
return Collections.unmodifiableSet(indexHistory);
879+
}
880+
844881
/**
845882
* @return Whether a self referencing check should be performed
846883
*/
@@ -990,7 +1027,7 @@ static ResolveResult error(String errorMessage) {
9901027
/**
9911028
* Provides a shallowly read-only, very limited, map-like view of two maps. The only methods that are implemented are
9921029
* {@link Map#get(Object)} and {@link Map#containsKey(Object)}, everything else throws UnsupportedOperationException.
993-
*
1030+
* <p>
9941031
* The overrides map has higher priority than the primary map -- values in that map under some key will take priority over values
9951032
* in the primary map under the same key.
9961033
*

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.util.HashMap;
7070
import java.util.HashSet;
7171
import java.util.Iterator;
72-
import java.util.LinkedHashSet;
7372
import java.util.LinkedList;
7473
import java.util.List;
7574
import java.util.Locale;
@@ -688,9 +687,7 @@ public void onFailure(Exception e) {
688687
});
689688

690689
IngestDocument ingestDocument = newIngestDocument(indexRequest);
691-
LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
692-
indexRecursionDetection.add(indexRequest.index());
693-
executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection);
690+
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
694691
i++;
695692
}
696693
}
@@ -774,8 +771,7 @@ private void executePipelines(
774771
final PipelineIterator pipelines,
775772
final IndexRequest indexRequest,
776773
final IngestDocument ingestDocument,
777-
final ActionListener<Boolean> listener,
778-
final Set<String> indexRecursionDetection
774+
final ActionListener<Boolean> listener
779775
) {
780776
assert pipelines.hasNext();
781777
PipelineSlot slot = pipelines.next();
@@ -859,17 +855,18 @@ private void executePipelines(
859855
return; // document failed!
860856
}
861857

862-
// check for cycles in the visited indices
863-
if (indexRecursionDetection.add(newIndex) == false) {
864-
List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
865-
indexRoute.add(newIndex);
858+
// add the index to the document's index history, and check for cycles in the visited indices
859+
boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
860+
if (cycle) {
861+
List<String> indexCycle = new ArrayList<>(ingestDocument.getIndexHistory());
862+
indexCycle.add(newIndex);
866863
listener.onFailure(
867864
new IllegalStateException(
868865
format(
869866
"index cycle detected while processing pipeline [%s] for document [%s]: %s",
870867
pipelineId,
871868
indexRequest.id(),
872-
indexRoute
869+
indexCycle
873870
)
874871
)
875872
);
@@ -890,7 +887,7 @@ private void executePipelines(
890887
}
891888

892889
if (newPipelines.hasNext()) {
893-
executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection);
890+
executePipelines(newPipelines, indexRequest, ingestDocument, listener);
894891
} else {
895892
// update the index request's source and (potentially) cache the timestamp for TSDB
896893
updateIndexRequestSource(indexRequest, ingestDocument);

server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.elasticsearch.core.Tuple;
1212
import org.elasticsearch.test.ESTestCase;
13+
import org.hamcrest.Matchers;
1314
import org.junit.Before;
1415

1516
import java.time.Instant;
@@ -1142,4 +1143,19 @@ public void testIsMetadata() {
11421143
assertFalse(IngestDocument.Metadata.isMetadata("address"));
11431144
}
11441145

1146+
public void testIndexHistory() {
1147+
// the index history contains the original index
1148+
String index1 = ingestDocument.getFieldValue("_index", String.class);
1149+
assertThat(index1, equalTo("index"));
1150+
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1));
1151+
1152+
// it can be updated to include another index
1153+
String index2 = "another_index";
1154+
assertTrue(ingestDocument.updateIndexHistory(index2));
1155+
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));
1156+
1157+
// an index cycle cannot be introduced, however
1158+
assertFalse(ingestDocument.updateIndexHistory(index1));
1159+
assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));
1160+
}
11451161
}

0 commit comments

Comments
 (0)