- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
[Streams] Propagate the ingest pipeline access pattern flag to the ingest document #130488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
96c1bc0
              9ee402b
              debaeb0
              d7b2276
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -27,12 +27,14 @@ | |
|  | ||
| import java.time.ZoneOffset; | ||
| import java.time.ZonedDateTime; | ||
| import java.util.ArrayDeque; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Base64; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Date; | ||
| import java.util.Deque; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
|  | @@ -73,6 +75,13 @@ public final class IngestDocument { | |
| // Contains all pipelines that have been executed for this document | ||
| private final Set<String> executedPipelines = new LinkedHashSet<>(); | ||
|  | ||
| /** | ||
| * Maintains the stack of access patterns for each pipeline that this document is currently being processed by. | ||
| * When a pipeline with one access pattern calls another pipeline with a different one, we must ensure the access patterns | ||
| * are correctly restored when returning from a nested pipeline to an enclosing pipeline. | ||
| */ | ||
| private final Deque<IngestPipelineFieldAccessPattern> accessPatternStack = new ArrayDeque<>(); | ||
|  | ||
| /** | ||
| * An ordered set of the values of the _index that have been used for this document. | ||
| * <p> | ||
|  | @@ -114,12 +123,13 @@ public IngestDocument(IngestDocument other) { | |
| deepCopyMap(other.ingestMetadata) | ||
| ); | ||
| /* | ||
| * The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're | ||
| * copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those | ||
| * involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more | ||
| * complicated, we're just copying this over here since it does no harm. | ||
| * The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric. | ||
| * Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from | ||
| * two non-test locations, and both of those involve the simulate pipeline logic. The simulate pipeline logic needs this | ||
| * information. Rather than making the code more complicated, we're just copying them over here since it does no harm. | ||
| */ | ||
| this.executedPipelines.addAll(other.executedPipelines); | ||
| this.accessPatternStack.addAll(other.accessPatternStack); | ||
| } | ||
|  | ||
| /** | ||
|  | @@ -856,8 +866,17 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except | |
| ); | ||
| } else if (executedPipelines.add(pipeline.getId())) { | ||
| Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); | ||
| IngestPipelineFieldAccessPattern previousAccessPattern = accessPatternStack.peek(); | ||
| accessPatternStack.push(pipeline.getFieldAccessPattern()); | ||
| pipeline.execute(this, (result, e) -> { | ||
| executedPipelines.remove(pipeline.getId()); | ||
| accessPatternStack.poll(); | ||
|         
                  jbaiera marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| assert previousAccessPattern == accessPatternStack.peek() | ||
| : "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected [" | ||
| + previousAccessPattern | ||
| + "] but found [" | ||
| + accessPatternStack.peek() | ||
| + "]"; | ||
|         
                  jbaiera marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| if (previousPipeline != null) { | ||
| ingestMetadata.put("pipeline", previousPipeline); | ||
| } else { | ||
|  | @@ -879,6 +898,13 @@ List<String> getPipelineStack() { | |
| return pipelineStack; | ||
| } | ||
|  | ||
| /** | ||
| * @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc | ||
| */ | ||
| public IngestPipelineFieldAccessPattern getCurrentAccessPattern() { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it intentional that this method isn't used above (and seems to only be used in the test)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not used above yet. I'm currently using it in a couple places in the next set of changes I'm working on. | ||
| return accessPatternStack.peek(); | ||
| } | ||
|  | ||
| /** | ||
| * Adds an index to the index history for this document, returning true if the index | ||
| * was added to the index history (i.e. if it wasn't already in the index history). | ||
|  | ||
Uh oh!
There was an error while loading. Please reload this page.