From bfb9dee47055efe774ddfc47d32351dc18901c4a Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Mon, 24 Mar 2025 14:04:59 +0100 Subject: [PATCH 1/7] Stream result pages from sub plans for FORK --- .../xpack/esql/action/ForkIT.java | 26 +++++ .../rules/physical/ProjectAwayColumns.java | 5 + .../xpack/esql/planner/PlannerUtils.java | 21 ++++ .../xpack/esql/planner/mapper/Mapper.java | 8 +- .../xpack/esql/plugin/ComputeService.java | 96 ++++++++++++++++++- .../xpack/esql/session/EsqlSession.java | 38 -------- .../elasticsearch/xpack/esql/CsvTests.java | 4 + 7 files changed, 150 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java index 58788764c6ed3..152119ea5a7ce 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java @@ -72,6 +72,29 @@ private void testSimpleImpl(String query) { } } + public void testRow() { + var query = """ + ROW a = [1, 2, 3, 4], b = 100 + | MV_EXPAND a + | FORK (WHERE a % 2 == 1) + (WHERE a % 2 == 0) + | SORT _fork, a + """; + + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("a", "b", "_fork")); + assertColumnTypes(resp.columns(), List.of("integer", "integer", "keyword")); + + Iterable> expectedValues = List.of( + List.of(1, 100, "fork1"), + List.of(3, 100, "fork1"), + List.of(2, 100, "fork2"), + List.of(4, 100, "fork2") + ); + assertValues(resp.values(), expectedValues); + } + } + public void testSortAndLimitInFirstSubQuery() { var query = """ FROM test @@ -216,6 +239,7 @@ public void testWhereSortOnlyInFork() { ( WHERE content:"fox" | SORT id ) ( WHERE content:"dog" | SORT id ) | KEEP _fork, id, content + | SORT _fork, id """; var queryWithMatchFunction = """ FROM test @@ -223,6 +247,7 @@ public void testWhereSortOnlyInFork() { ( WHERE match(content, "fox") | SORT id ) ( WHERE match(content, "dog") | SORT id ) | KEEP _fork, id, content + | SORT _fork, id """; for (var query : List.of(queryWithMatchOperator, queryWithMatchFunction)) { try (var resp = run(query)) { @@ -509,6 +534,7 @@ public void testWithEvalSimple() { | FORK ( EVAL a = 1 ) ( EVAL a = 2 ) | KEEP a, _fork, id, content + | SORT _fork """; try (var resp = run(query)) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java index 272a4d4a879f3..02930b6362363 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.rule.Rule; @@ -45,6 +46,10 @@ public PhysicalPlan apply(PhysicalPlan plan) { // This will require updating should we choose to have non-unary execution plans in the future. return plan.transformDown(currentPlanNode -> { + if (currentPlanNode instanceof MergeExec) { + keepTraversing.set(FALSE); + } + if (keepTraversing.get() == false) { return currentPlanNode; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index ec87016ccba31..32a82e94f11e7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.esql.plan.QueryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; @@ -43,6 +44,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -67,6 +69,25 @@ public class PlannerUtils { + public static Tuple, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) { + var subplans = new Holder>(); + PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> { + subplans.set(me.children().stream().map(child -> { + // TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation + child = child.transformUp(FragmentExec.class, f -> { + var logicalFragment = f.fragment(); + logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output()); + return new FragmentExec(logicalFragment); + }); + + return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child); + }).toList()); + return new ExchangeSourceExec(me.source(), me.output(), false); + }); + + return new Tuple<>(subplans.get(), mainPlan); + } + public static Tuple breakPlanBetweenCoordinatorAndDataNode(PhysicalPlan plan, Configuration config) { var dataNodePlan = new Holder(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index af7db0f190c03..ab137fe872795 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -41,7 +41,6 @@ import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; -import java.util.ArrayList; import java.util.List; /** @@ -235,12 +234,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { } private PhysicalPlan mapFork(Fork fork) { - List physicalChildren = new ArrayList<>(); - for (var child : fork.children()) { - var mappedChild = new FragmentExec(child); - physicalChildren.add(mappedChild); - } - return new MergeExec(fork.source(), physicalChildren, fork.output()); + return new MergeExec(fork.source(), fork.children().stream().map(child -> map(child)).toList(), fork.output()); } public static boolean isPipelineBreaker(LogicalPlan p) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 6ebaeca1e56c0..127b826e1d59f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -23,6 +23,8 @@ import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.compute.operator.exchange.ExchangeSink; +import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -175,6 +177,89 @@ public void execute( FoldContext foldContext, EsqlExecutionInfo execInfo, ActionListener listener + ) { + Tuple, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan); + + List subplans = subplansAndMainPlan.v1(); + + // we have no sub plans, so we can just execute the given plan + if (subplans == null || subplans.size() == 0) { + executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, listener, null); + return; + } + + final List collectedPages = Collections.synchronizedList(new ArrayList<>()); + PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), collectedPages::add); + + listener = listener.delegateResponse((l, e) -> { + collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks)); + l.onFailure(e); + }); + + var mainSessionId = newChildSession(sessionId); + QueryPragmas queryPragmas = configuration.pragmas(); + + ExchangeSourceHandler mainExchangeSource = new ExchangeSourceHandler( + queryPragmas.exchangeBufferSize(), + transportService.getThreadPool().executor(ThreadPool.Names.SEARCH) + ); + + exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource); + var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); + + for (PhysicalPlan subplan : subplans) { + var childSessionId = newChildSession(sessionId); + ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); + // funnel sub plan pages into the main plan exchange source + mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); + executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { + exchangeSink.addCompletionListener( + ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) + ); + }, e -> { + exchangeService.finishSinkHandler(childSessionId, e); + finalListener.onFailure(e); + }), () -> exchangeSink.createExchangeSink(() -> {})); + } + + var computeContext = new ComputeContext( + newChildSession(sessionId), + "single", + LOCAL_CLUSTER, + List.of(), + configuration, + foldContext, + mainExchangeSource::createExchangeSource, + null + ); + + Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); + + PhysicalPlan finalMainPlan = mainPlan; + + try ( + ComputeListener localListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + finalListener.map(profiles -> { + execInfo.markEndQuery(); + return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo); + }) + ) + ) { + runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute()); + } + } + + public void executePlan( + String sessionId, + CancellableTask rootTask, + PhysicalPlan physicalPlan, + Configuration configuration, + FoldContext foldContext, + EsqlExecutionInfo execInfo, + ActionListener listener, + Supplier exchangeSinkSupplier ) { Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, @@ -185,7 +270,12 @@ public void execute( collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks)); l.onFailure(e); }); - PhysicalPlan coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add); + PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1(); + + if (exchangeSinkSupplier == null) { + coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add); + } + PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2(); if (dataNodePlan != null && dataNodePlan instanceof ExchangeSinkExec == false) { assert false : "expected data node plan starts with an ExchangeSink; got " + dataNodePlan; @@ -211,7 +301,7 @@ public void execute( configuration, foldContext, null, - null + exchangeSinkSupplier ); updateShardCountForCoordinatorOnlyQuery(execInfo); try ( @@ -287,7 +377,7 @@ public void execute( configuration, foldContext, exchangeSource::createExchangeSource, - null + exchangeSinkSupplier ), coordinatorPlan, localListener.acquireCompute() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 9330df751c00d..ec78b1080894d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -75,8 +75,6 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; -import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; -import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.planner.premapper.PreMapper; @@ -225,22 +223,6 @@ private void executeSubPlans( }); }); - // Currently fork is limited and supported as streaming operators, similar to inlinestats - physicalPlan = physicalPlan.transformUp(MergeExec.class, m -> { - List newSubPlans = new ArrayList<>(); - for (var plan : m.children()) { - if (plan instanceof FragmentExec fragmentExec) { - LogicalPlan subplan = fragmentExec.fragment(); - subplan = optimizedPlan(subplan); - PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request); - subplans.add(new PlanTuple(subqueryPlan, subplan)); - plan = new FragmentExec(subplan); - } - newSubPlans.add(plan); - } - return new MergeExec(m.source(), newSubPlans, m.output()); - }); - Iterator iterator = subplans.iterator(); // TODO: merge into one method @@ -279,26 +261,6 @@ private void executeSubPlan( ); }); - // replace the original logical plan with the backing result, in MergeExec - newPlan = newPlan.transformUp(MergeExec.class, m -> { - boolean changed = m.children() - .stream() - .filter(sp -> FragmentExec.class.isAssignableFrom(sp.getClass())) - .map(FragmentExec.class::cast) - .anyMatch(fragmentExec -> fragmentExec.fragment() == tuple.logical); - if (changed) { - List newSubPlans = m.children().stream().map(subPlan -> { - if (subPlan instanceof FragmentExec fe && fe.fragment() == tuple.logical) { - return new LocalSourceExec(resultWrapper.source(), resultWrapper.output(), resultWrapper.supplier()); - } else { - return subPlan; - } - }).toList(); - return new MergeExec(m.source(), newSubPlans, m.output()); - } - return m; - }); - if (subPlanIterator.hasNext() == false) { runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { profileAccumulator.addAll(finalResult.profiles()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 189ec7774e468..31bffab31c267 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -307,6 +307,10 @@ public final void test() throws Throwable { "CSV tests cannot currently handle scoring that depends on Lucene", testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.METADATA_SCORE.capabilityName()) ); + assumeFalse( + "CSV tests cannot currently handle FORK", + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK.capabilityName()) + ); if (Build.current().isSnapshot()) { assertThat( From c209f02f1d50d8935016b386325ad7f7793c8a8f Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Mon, 14 Apr 2025 10:35:15 +0200 Subject: [PATCH 2/7] Use different capability for csv tests --- .../src/main/resources/fork.csv-spec | 20 +++++++++---------- .../xpack/esql/action/EsqlCapabilities.java | 4 ++-- .../elasticsearch/xpack/esql/CsvTests.java | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 2dcbc3f48cabe..5a4865517064a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -3,7 +3,7 @@ // simpleFork -required_capability: fork +required_capability: fork_v3 FROM employees | FORK ( WHERE emp_no == 10001 ) @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword ; forkWithWhereSortAndLimit -required_capability: fork +required_capability: fork_v3 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 ) @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword ; fiveFork -required_capability: fork +required_capability: fork_v3 FROM employees | FORK ( WHERE emp_no == 10005 ) @@ -59,7 +59,7 @@ fork5 | 10001 ; forkWithWhereSortDescAndLimit -required_capability: fork +required_capability: fork_v3 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 ) @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin ; forkWithCommonPrefilter -required_capability: fork +required_capability: fork_v3 FROM employees | WHERE emp_no > 10050 @@ -94,7 +94,7 @@ fork2 | 10100 ; forkWithSemanticSearchAndScore -required_capability: fork +required_capability: fork_v3 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w ; forkWithEvals -required_capability: fork_v2 +required_capability: fork_v3 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1) @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2 ; forkWithStats -required_capability: fork_v2 +required_capability: fork_v3 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081) @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null ; forkWithDissect -required_capability: fork_v2 +required_capability: fork_v3 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei ; forkWithMixOfCommands -required_capability: fork_v2 +required_capability: fork_v3 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 406a7f5f6f08d..38a75cf756034 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -978,9 +978,9 @@ public enum Cap { MAX_OVER_TIME(Build.current().isSnapshot()), /** - * Support STATS/EVAL/DISSECT in Fork branches + * Support streaming of sub plan results */ - FORK_V2(Build.current().isSnapshot()), + FORK_V3(Build.current().isSnapshot()), /** * Does the usage information for ESQL contain a histogram of {@code took} values? diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 31bffab31c267..65c77c4a64c31 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -309,7 +309,7 @@ public final void test() throws Throwable { ); assumeFalse( "CSV tests cannot currently handle FORK", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V3.capabilityName()) ); if (Build.current().isSnapshot()) { From 07a0b20f8bd3cdd280f4eed1c30a5f9798ee780a Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Mon, 14 Apr 2025 12:27:33 +0200 Subject: [PATCH 3/7] Add javadoc --- .../elasticsearch/xpack/esql/planner/PlannerUtils.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 32a82e94f11e7..c89dae29e592e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -69,6 +69,15 @@ public class PlannerUtils { + /** + * When the plan contains children like {@code MergeExec} resulted from the planning of commands such as FORK, + * we need to break the plan into sub plans and a main coordinator plan. + * The result pages from each sub plan will be funneled to the main coordinator plan. + * To achieve this, we wire each sub plan with a {@code ExchangeSinkExec} and add a {@code ExchangeSourceExec} + * to the main coordinator plan. + * There is an additional split of each sub plan into a data node plan and coordinator plan. + * This split is not done here, but as part of {@code PlannerUtils#breakPlanBetweenCoordinatorAndDataNode}. + */ public static Tuple, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) { var subplans = new Holder>(); PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> { From 7b0aaa3afb9f65bd3030b7bc398a625070a07014 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Tue, 15 Apr 2025 11:18:28 +0200 Subject: [PATCH 4/7] Fix RRF tests --- .../compute/operator/RrfScoreEvalOperator.java | 7 +++++-- .../esql/qa/testFixtures/src/main/resources/rrf.csv-spec | 1 + .../java/org/elasticsearch/xpack/esql/action/RrfIT.java | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RrfScoreEvalOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RrfScoreEvalOperator.java index 452faa5fbd611..9dc24e9e0f98d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RrfScoreEvalOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RrfScoreEvalOperator.java @@ -68,8 +68,11 @@ protected Page process(Page page) { for (int i = 0; i < page.getBlockCount() - 1; i++) { projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i; } - - return page.projectBlocks(projections); + try { + return page.projectBlocks(projections); + } finally { + page.releaseBlocks(); + } } @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec index 8a56aff09ee0b..8fdef43c73f53 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec @@ -120,6 +120,7 @@ FROM semantic_text METADATA _id, _score, _index ( WHERE semantic_text_field:"something else" | SORT _score DESC | LIMIT 2) | RRF | EVAL _score = round(_score, 4) +| EVAL _fork = mv_sort(_fork) | KEEP _fork, _score, _id, semantic_text_field ; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RrfIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RrfIT.java index 3ffd235882886..4986d11ddcc81 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RrfIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RrfIT.java @@ -40,6 +40,7 @@ public void testRrf() { ( WHERE content:"fox" | SORT _score, _id DESC ) ( WHERE content:"dog" | SORT _score, _id DESC ) | RRF + | EVAL _fork = mv_sort(_fork) | EVAL _score = round(_score, 4) | KEEP id, content, _score, _fork """; From 753632d00fe2428b346701dfba83ede11b5da442 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Tue, 15 Apr 2025 11:23:51 +0200 Subject: [PATCH 5/7] Remove MergeOperator since it's no longer needed --- .../compute/operator/MergeOperator.java | 85 ------------------- .../esql/planner/LocalExecutionPlanner.java | 11 --- 2 files changed, 96 deletions(-) delete mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MergeOperator.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MergeOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MergeOperator.java deleted file mode 100644 index bb6cbbe15b728..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MergeOperator.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.operator; - -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.core.Releasables; - -import java.util.List; -import java.util.ListIterator; -import java.util.function.Supplier; - -/** - * A merge operator is effectively a "fan-in" operator - accepts input - * from several sources and provides it in a single output. - */ -public class MergeOperator extends SourceOperator { - - public record MergeOperatorFactory(BlockSuppliers suppliers) implements SourceOperatorFactory { - @Override - public String describe() { - return "MergeOperator[suppliers=" + suppliers + "]"; - } - - @Override - public SourceOperator get(DriverContext driverContext) { - return new MergeOperator(driverContext.blockFactory(), suppliers); - } - } - - private final BlockFactory blockFactory; - private final BlockSuppliers suppliers; - private boolean finished; - private ListIterator subPlanBlocks; - - public MergeOperator(BlockFactory blockFactory, BlockSuppliers suppliers) { - super(); - this.blockFactory = blockFactory; - this.suppliers = suppliers; - } - - public interface BlockSuppliers extends Supplier> {}; - - @Override - public void finish() {} - - @Override - public boolean isFinished() { - return finished; - } - - @Override - public Page getOutput() { - if (subPlanBlocks == null) { - subPlanBlocks = suppliers.get().listIterator(); - } - - if (subPlanBlocks.hasNext()) { - return new Page(subPlanBlocks.next()); - } - finished = true; - return null; - } - - @Override - public void close() { - // release blocks from any subplan not fully consumed. - if (subPlanBlocks != null) { - while (subPlanBlocks.hasNext()) { - Releasables.close(subPlanBlocks.next()); - } - } - } - - @Override - public String toString() { - return "MergeOperator[subPlanBlocks=" + subPlanBlocks + "]"; - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 3ea9212ef9572..f263c2f80e429 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -31,7 +31,6 @@ import org.elasticsearch.compute.operator.LimitOperator; import org.elasticsearch.compute.operator.LocalSourceOperator; import org.elasticsearch.compute.operator.LocalSourceOperator.LocalSourceFactory; -import org.elasticsearch.compute.operator.MergeOperator; import org.elasticsearch.compute.operator.MvExpandOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.Operator.OperatorFactory; @@ -103,7 +102,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; -import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; @@ -281,8 +279,6 @@ else if (node instanceof OutputExec outputExec) { return planOutput(outputExec, context); } else if (node instanceof ExchangeSinkExec exchangeSink) { return planExchangeSink(exchangeSink, context); - } else if (node instanceof MergeExec mergeExec) { - return planMerge(mergeExec, context); } else if (node instanceof RrfScoreEvalExec rrf) { return planRrfScoreEvalExec(rrf, context); } @@ -804,13 +800,6 @@ private PhysicalOperation planChangePoint(ChangePointExec changePoint, LocalExec ); } - private PhysicalOperation planMerge(MergeExec mergeExec, LocalExecutionPlannerContext context) { - Layout.Builder layout = new Layout.Builder(); - layout.append(mergeExec.output()); - MergeOperator.BlockSuppliers suppliers = () -> mergeExec.suppliers().stream().map(s -> s.get()).toList(); - return PhysicalOperation.fromSource(new MergeOperator.MergeOperatorFactory(suppliers), layout.build()); - } - /** * Immutable physical operation. */ From 768c5815190fde3b6dbc817963a7b2520f5bb9ad Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Tue, 15 Apr 2025 12:15:05 +0200 Subject: [PATCH 6/7] Fix another RRF test --- .../plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec index 8fdef43c73f53..b4efd1b422089 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec @@ -76,6 +76,7 @@ FROM books METADATA _id, _index, _score ( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3) | RRF | STATS count_fork=COUNT(*) BY _fork +| SORT _fork ; count_fork:long | _fork:keyword From b6ecc104af7d2200b1e57c9c9520fa9eaa753716 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 16 Apr 2025 11:53:10 +0200 Subject: [PATCH 7/7] Add empty sink to prevent exchange source to finish too early --- .../xpack/esql/plugin/ComputeService.java | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 3312f25d3d9fd..ec5334fc44cf1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -204,49 +204,49 @@ public void execute( ); exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource); - var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); - - for (PhysicalPlan subplan : subplans) { - var childSessionId = newChildSession(sessionId); - ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); - // funnel sub plan pages into the main plan exchange source - mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); - executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { - exchangeSink.addCompletionListener( - ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) - ); - }, e -> { - exchangeService.finishSinkHandler(childSessionId, e); - finalListener.onFailure(e); - }), () -> exchangeSink.createExchangeSink(() -> {})); - } + try (var ignored = mainExchangeSource.addEmptySink()) { + var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); + var computeContext = new ComputeContext( + mainSessionId, + "single", + LOCAL_CLUSTER, + List.of(), + configuration, + foldContext, + mainExchangeSource::createExchangeSource, + null + ); - var computeContext = new ComputeContext( - newChildSession(sessionId), - "single", - LOCAL_CLUSTER, - List.of(), - configuration, - foldContext, - mainExchangeSource::createExchangeSource, - null - ); + Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); + PhysicalPlan finalMainPlan = mainPlan; - Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask); + try ( + ComputeListener localListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + finalListener.map(profiles -> { + execInfo.markEndQuery(); + return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo); + }) + ) + ) { + runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute()); + } - PhysicalPlan finalMainPlan = mainPlan; - - try ( - ComputeListener localListener = new ComputeListener( - transportService.getThreadPool(), - cancelQueryOnFailure, - finalListener.map(profiles -> { - execInfo.markEndQuery(); - return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo); - }) - ) - ) { - runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute()); + for (PhysicalPlan subplan : subplans) { + var childSessionId = newChildSession(sessionId); + ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); + // funnel sub plan pages into the main plan exchange source + mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); + executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { + exchangeSink.addCompletionListener( + ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) + ); + }, e -> { + exchangeService.finishSinkHandler(childSessionId, e); + finalListener.onFailure(e); + }), () -> exchangeSink.createExchangeSink(() -> {})); + } } }