|
62 | 62 | import org.elasticsearch.xpack.esql.plan.logical.Fork; |
63 | 63 | import org.elasticsearch.xpack.esql.plan.logical.Keep; |
64 | 64 | import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; |
65 | | -import org.elasticsearch.xpack.esql.plan.logical.Merge; |
66 | 65 | import org.elasticsearch.xpack.esql.plan.logical.Project; |
67 | 66 | import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; |
68 | 67 | import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; |
@@ -265,50 +264,41 @@ private void executeSubPlan( |
265 | 264 | LocalRelation resultWrapper = resultToPlan(tuple.logical, result); |
266 | 265 |
|
267 | 266 | // replace the original logical plan with the backing result |
268 | | - final PhysicalPlan newPlan = plan.transformUp(PhysicalPlan.class, p -> { |
269 | | - if (p instanceof FragmentExec f) { |
270 | | - LogicalPlan frag = f.fragment(); |
271 | | - |
272 | | - return f.withFragment(frag.transformUp(LogicalPlan.class, bp -> { |
273 | | - if (bp instanceof InlineJoin ij && ij.right() == tuple.logical) { |
274 | | - return InlineJoin.inlineData(ij, resultWrapper); |
275 | | - } else if (bp instanceof Merge mr && mr.children().stream().anyMatch(sp -> sp == tuple.logical)) { |
276 | | - List<LogicalPlan> newSubPlans = new ArrayList<>(mr.children()); |
277 | | - for (int i = 0; i < newSubPlans.size(); i++) { |
278 | | - if (newSubPlans.get(i) == tuple.logical) { |
279 | | - newSubPlans.set(i, resultWrapper); |
280 | | - } |
281 | | - } |
282 | | - return new Merge(mr.source(), newSubPlans); |
283 | | - } |
284 | | - return bp; |
285 | | - })); |
286 | | - } |
287 | | - if (p instanceof MergeExec m) { |
288 | | - boolean anyMatch = m.children() |
289 | | - .stream() |
290 | | - .filter(sp -> FragmentExec.class.isAssignableFrom(sp.getClass())) |
291 | | - .map(FragmentExec.class::cast) |
292 | | - .anyMatch(fragmentExec -> fragmentExec.fragment() == tuple.logical); |
293 | | - if (anyMatch) { |
294 | | - List<PhysicalPlan> newSubPlans = new ArrayList<>(m.children()); |
295 | | - for (int i = 0; i < newSubPlans.size(); i++) { |
296 | | - var subPlan = newSubPlans.get(i); |
297 | | - if (subPlan instanceof FragmentExec fe && fe.fragment() == tuple.logical) { |
298 | | - var resultsExec = new LocalSourceExec( |
299 | | - resultWrapper.source(), |
300 | | - resultWrapper.output(), |
301 | | - resultWrapper.supplier() |
302 | | - ); |
303 | | - newSubPlans.set(i, resultsExec); |
304 | | - } |
| 267 | + PhysicalPlan newPlan = plan.transformUp(FragmentExec.class, f -> { |
| 268 | + LogicalPlan frag = f.fragment(); |
| 269 | + return f.withFragment( |
| 270 | + frag.transformUp( |
| 271 | + InlineJoin.class, |
| 272 | + ij -> ij.right() == tuple.logical ? InlineJoin.inlineData(ij, resultWrapper) : ij |
| 273 | + ) |
| 274 | + ); |
| 275 | + }); |
| 276 | + |
| 277 | + // replace the original logical plan with the backing result, in MergeExec |
| 278 | + newPlan = newPlan.transformUp(MergeExec.class, m -> { |
| 279 | + boolean changed = m.children() |
| 280 | + .stream() |
| 281 | + .filter(sp -> FragmentExec.class.isAssignableFrom(sp.getClass())) |
| 282 | + .map(FragmentExec.class::cast) |
| 283 | + .anyMatch(fragmentExec -> fragmentExec.fragment() == tuple.logical); |
| 284 | + if (changed) { |
| 285 | + List<PhysicalPlan> newSubPlans = new ArrayList<>(m.children()); |
| 286 | + for (int i = 0; i < newSubPlans.size(); i++) { |
| 287 | + var subPlan = newSubPlans.get(i); |
| 288 | + if (subPlan instanceof FragmentExec fe && fe.fragment() == tuple.logical) { |
| 289 | + var resultsExec = new LocalSourceExec( |
| 290 | + resultWrapper.source(), |
| 291 | + resultWrapper.output(), |
| 292 | + resultWrapper.supplier() |
| 293 | + ); |
| 294 | + newSubPlans.set(i, resultsExec); |
305 | 295 | } |
306 | | - return new MergeExec(m.source(), newSubPlans, m.output()); |
307 | 296 | } |
| 297 | + return new MergeExec(m.source(), newSubPlans, m.output()); |
308 | 298 | } |
309 | | - |
310 | | - return p; |
| 299 | + return m; |
311 | 300 | }); |
| 301 | + |
312 | 302 | if (subPlanIterator.hasNext() == false) { |
313 | 303 | runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { |
314 | 304 | profileAccumulator.addAll(finalResult.profiles()); |
|
0 commit comments