3636import java .util .HashSet ;
3737import java .util .List ;
3838import java .util .Set ;
39+ import java .util .stream .Collectors ;
3940
4041import static org .elasticsearch .xpack .esql .optimizer .rules .logical .PruneEmptyPlans .skipPlan ;
4142
@@ -49,7 +50,7 @@ public LogicalPlan apply(LogicalPlan plan) {
4950 return pruneColumns (plan , plan .outputSet ().asBuilder (), false );
5051 }
5152
52- static LogicalPlan pruneColumns (LogicalPlan plan , AttributeSet .Builder used , boolean inlineJoin ) {
53+ private static LogicalPlan pruneColumns (LogicalPlan plan , AttributeSet .Builder used , boolean inlineJoin ) {
5354 Holder <Boolean > forkPresent = new Holder <>(false );
5455 // while going top-to-bottom (upstream)
5556 return plan .transformDown (p -> {
@@ -96,7 +97,7 @@ static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boo
9697 });
9798 }
9899
99- static LogicalPlan pruneColumnsInAggregate (Aggregate aggregate , AttributeSet .Builder used , boolean inlineJoin ) {
100+ private static LogicalPlan pruneColumnsInAggregate (Aggregate aggregate , AttributeSet .Builder used , boolean inlineJoin ) {
100101 LogicalPlan p = aggregate ;
101102
102103 var remaining = pruneUnusedAndAddReferences (aggregate .aggregates (), used );
@@ -136,7 +137,7 @@ static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, AttributeSet.Bui
136137 return p ;
137138 }
138139
139- static LogicalPlan pruneColumnsInInlineJoinRight (InlineJoin ij , AttributeSet .Builder used , Holder <Boolean > recheck ) {
140+ private static LogicalPlan pruneColumnsInInlineJoinRight (InlineJoin ij , AttributeSet .Builder used , Holder <Boolean > recheck ) {
140141 LogicalPlan p = ij ;
141142
142143 used .addAll (ij .references ());
@@ -157,7 +158,7 @@ static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Bui
157158 return p ;
158159 }
159160
160- static LogicalPlan pruneColumnsInEval (Eval eval , AttributeSet .Builder used , Holder <Boolean > recheck ) {
161+ private static LogicalPlan pruneColumnsInEval (Eval eval , AttributeSet .Builder used , Holder <Boolean > recheck ) {
161162 LogicalPlan p = eval ;
162163
163164 var remaining = pruneUnusedAndAddReferences (eval .fields (), used );
@@ -175,7 +176,7 @@ static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder used, Hold
175176 }
176177
177178 // Note: only run when the Project is a descendent of an InlineJoin.
178- static LogicalPlan pruneColumnsInProject (Project project , AttributeSet .Builder used ) {
179+ private static LogicalPlan pruneColumnsInProject (Project project , AttributeSet .Builder used ) {
179180 LogicalPlan p = project ;
180181
181182 var remaining = pruneUnusedAndAddReferences (project .projections (), used );
@@ -186,7 +187,7 @@ static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder u
186187 return p ;
187188 }
188189
189- static LogicalPlan pruneColumnsInEsRelation (EsRelation esr , AttributeSet .Builder used ) {
190+ private static LogicalPlan pruneColumnsInEsRelation (EsRelation esr , AttributeSet .Builder used ) {
190191 LogicalPlan p = esr ;
191192
192193 if (esr .indexMode () == IndexMode .LOOKUP ) {
@@ -204,15 +205,11 @@ static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet.Builder
204205
205206 private static LogicalPlan pruneColumnsInFork (Fork fork , AttributeSet .Builder used ) {
206207 // prune the output attributes of fork based on usage from the rest of the plan
207- // this does not consider the inner usage within each branch of the fork
208- // as those will be handled when traversing down each branch in PruneColumnsInForkBranches
209- LogicalPlan p = fork ;
210-
211208 // should exit early for UnionAll
212209 if (fork instanceof UnionAll ) {
213- return p ;
210+ return fork ;
214211 }
215- boolean changed = false ;
212+ boolean forkOutputChanged = false ;
216213 AttributeSet .Builder builder = AttributeSet .builder ();
217214 // if any of the fork outputs are used, keep them
218215 // otherwise, prune them based on the rest of the plan's usage
@@ -222,14 +219,71 @@ private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder us
222219 if (attr .synthetic () || names .contains (attr .name ())) {
223220 builder .add (attr );
224221 } else {
225- changed = true ;
222+ forkOutputChanged = true ;
226223 }
227224 }
228- if (changed ) {
229- List <Attribute > attrs = builder .build ().stream ().toList ();
230- p = new Fork (fork .source (), fork .children (), attrs );
225+
226+ var pruneForkAttrs = forkOutputChanged ? builder .build ().stream ().toList () : fork .output ();
227+ // now that we have the pruned fork output attributes, we can proceed to apply pruning all children plan
228+ var usedFork = AttributeSet .forkBuilder ();
229+ usedFork .addAll (pruneForkAttrs );
230+ var forkOutputNames = pruneForkAttrs .stream ().map (NamedExpression ::name ).collect (Collectors .toSet ());
231+ boolean childrenChanged = false ;
232+ List <LogicalPlan > newChildren = new ArrayList <>();
233+ for (var child : fork .children ()) {
234+ var clonedUsed = AttributeSet .forkBuilder ().addAll (usedFork );
235+ var newChild = pruneSubPlan (child , clonedUsed , forkOutputNames );
236+ newChildren .add (newChild );
237+ if (false == newChild .equals (child )) {
238+ childrenChanged = true ;
239+ }
231240 }
232- return p ;
241+ if (childrenChanged ) {
242+ return new Fork (fork .source (), newChildren , pruneForkAttrs );
243+ } else if (forkOutputChanged ) {
244+ return new Fork (fork .source (), fork .children (), pruneForkAttrs );
245+ }
246+ return fork ;
247+ }
248+
249+ private static LogicalPlan pruneSubPlan (LogicalPlan plan , AttributeSet .Builder usedAttrs , Set <String > forkOutput ) {
250+ if (plan instanceof LocalRelation localRelation ) {
251+ var outputAttrs = localRelation .output ().stream ().filter (usedAttrs ::contains ).collect (Collectors .toList ());
252+ return new LocalRelation (localRelation .source (), outputAttrs , localRelation .supplier ());
253+ }
254+
255+ var projectHolder = new Holder <>(false );
256+ return plan .transformDown (p -> {
257+ if (p instanceof Limit || p instanceof Sample ) {
258+ return p ;
259+ }
260+
261+ var recheck = new Holder <Boolean >();
262+ do {
263+ // we operate using the names of the fields, rather than comparing the attributes directly,
264+ // as attributes may have been recreated during the transformations of fork branches.
265+ recheck .set (false );
266+ p = switch (p ) {
267+ case Aggregate agg -> pruneColumnsInAggregate (agg , usedAttrs , false );
268+ case InlineJoin inj -> pruneColumnsInInlineJoinRight (inj , usedAttrs , recheck );
269+ case Eval eval -> pruneColumnsInEval (eval , usedAttrs , recheck );
270+ case Project project -> {
271+ // process only the direct Project after Fork, but skip any subsequent instances
272+ if (projectHolder .get ()) {
273+ yield p ;
274+ } else {
275+ projectHolder .set (true );
276+ var prunedAttrs = project .projections ().stream ().filter (x -> forkOutput .contains (x .name ())).toList ();
277+ yield new Project (project .source (), project .child (), prunedAttrs );
278+ }
279+ }
280+ case EsRelation esr -> pruneColumnsInEsRelation (esr , usedAttrs );
281+ default -> p ;
282+ };
283+ } while (recheck .get ());
284+ usedAttrs .addAll (p .references ());
285+ return p ;
286+ });
233287 }
234288
235289 private static LogicalPlan emptyLocalRelation (UnaryPlan plan ) {
0 commit comments