Skip to content
Merged
20 changes: 10 additions & 10 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

simpleFork
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE emp_no == 10001 )
Expand All @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
;

forkWithWhereSortAndLimit
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
Expand All @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
;

fiveFork
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE emp_no == 10005 )
Expand All @@ -59,7 +59,7 @@ fork5 | 10001
;

forkWithWhereSortDescAndLimit
required_capability: fork
required_capability: fork_v3

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
Expand All @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
;

forkWithCommonPrefilter
required_capability: fork
required_capability: fork_v3

FROM employees
| WHERE emp_no > 10050
Expand All @@ -94,7 +94,7 @@ fork2 | 10100
;

forkWithSemanticSearchAndScore
required_capability: fork
required_capability: fork_v3
required_capability: semantic_text_field_caps
required_capability: metadata_score

Expand All @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
;

forkWithEvals
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
Expand All @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
;

forkWithStats
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
Expand All @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
;

forkWithDissect
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
;

forkWithMixOfCommands
required_capability: fork_v2
required_capability: fork_v3

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ private void testSimpleImpl(String query) {
}
}

public void testRow() {
var query = """
ROW a = [1, 2, 3, 4], b = 100
| MV_EXPAND a
| FORK (WHERE a % 2 == 1)
(WHERE a % 2 == 0)
| SORT _fork, a
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
assertColumnTypes(resp.columns(), List.of("integer", "integer", "keyword"));

Iterable<Iterable<Object>> expectedValues = List.of(
List.of(1, 100, "fork1"),
List.of(3, 100, "fork1"),
List.of(2, 100, "fork2"),
List.of(4, 100, "fork2")
);
assertValues(resp.values(), expectedValues);
}
}

public void testSortAndLimitInFirstSubQuery() {
var query = """
FROM test
Expand Down Expand Up @@ -216,13 +239,15 @@ public void testWhereSortOnlyInFork() {
( WHERE content:"fox" | SORT id )
( WHERE content:"dog" | SORT id )
| KEEP _fork, id, content
| SORT _fork, id
""";
var queryWithMatchFunction = """
FROM test
| FORK
( WHERE match(content, "fox") | SORT id )
( WHERE match(content, "dog") | SORT id )
| KEEP _fork, id, content
| SORT _fork, id
""";
for (var query : List.of(queryWithMatchOperator, queryWithMatchFunction)) {
try (var resp = run(query)) {
Expand Down Expand Up @@ -509,6 +534,7 @@ public void testWithEvalSimple() {
| FORK ( EVAL a = 1 )
( EVAL a = 2 )
| KEEP a, _fork, id, content
| SORT _fork
""";

try (var resp = run(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,9 +978,9 @@ public enum Cap {
MAX_OVER_TIME(Build.current().isSnapshot()),

/**
* Support STATS/EVAL/DISSECT in Fork branches
* Support streaming of sub plan results
*/
FORK_V2(Build.current().isSnapshot()),
FORK_V3(Build.current().isSnapshot()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems obvious now, but I like how this capability can be effectively incremented (without affecting previous releases and or over excessively polluting). This could be a good pattern to socialise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this approach with lookup join and it made sense to reuse here.


/**
* Does the usage information for ESQL contain a histogram of {@code took} values?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand All @@ -45,6 +46,10 @@ public PhysicalPlan apply(PhysicalPlan plan) {

// This will require updating should we choose to have non-unary execution plans in the future.
return plan.transformDown(currentPlanNode -> {
if (currentPlanNode instanceof MergeExec) {
keepTraversing.set(FALSE);
}

if (keepTraversing.get() == false) {
return currentPlanNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
Expand All @@ -67,6 +69,25 @@

public class PlannerUtils {

public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a short javadoc description.

var subplans = new Holder<List<PhysicalPlan>>();
PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> {
subplans.set(me.children().stream().map(child -> {
// TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation
child = child.transformUp(FragmentExec.class, f -> {
var logicalFragment = f.fragment();
logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output());
return new FragmentExec(logicalFragment);
});

return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child);
}).toList());
return new ExchangeSourceExec(me.source(), me.output(), false);
});

return new Tuple<>(subplans.get(), mainPlan);
}

public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode(PhysicalPlan plan, Configuration config) {
var dataNodePlan = new Holder<PhysicalPlan>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -235,12 +234,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
}

private PhysicalPlan mapFork(Fork fork) {
List<PhysicalPlan> physicalChildren = new ArrayList<>();
for (var child : fork.children()) {
var mappedChild = new FragmentExec(child);
physicalChildren.add(mappedChild);
}
return new MergeExec(fork.source(), physicalChildren, fork.output());
return new MergeExec(fork.source(), fork.children().stream().map(child -> map(child)).toList(), fork.output());
}

public static boolean isPipelineBreaker(LogicalPlan p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -175,6 +177,89 @@ public void execute(
FoldContext foldContext,
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

List<PhysicalPlan> subplans = subplansAndMainPlan.v1();

// we have no sub plans, so we can just execute the given plan
if (subplans == null || subplans.size() == 0) {
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, listener, null);
return;
}

final List<Page> collectedPages = Collections.synchronizedList(new ArrayList<>());
PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), collectedPages::add);

listener = listener.delegateResponse((l, e) -> {
collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks));
l.onFailure(e);
});

var mainSessionId = newChildSession(sessionId);
QueryPragmas queryPragmas = configuration.pragmas();

ExchangeSourceHandler mainExchangeSource = new ExchangeSourceHandler(
queryPragmas.exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH)
);

exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource);
var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));

for (PhysicalPlan subplan : subplans) {
var childSessionId = newChildSession(sessionId);
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
// funnel sub plan pages into the main plan exchange source
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
exchangeSink.addCompletionListener(
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
);
}, e -> {
exchangeService.finishSinkHandler(childSessionId, e);
finalListener.onFailure(e);
}), () -> exchangeSink.createExchangeSink(() -> {}));
}

var computeContext = new ComputeContext(
newChildSession(sessionId),
"single",
LOCAL_CLUSTER,
List.of(),
configuration,
foldContext,
mainExchangeSource::createExchangeSource,
null
);

Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);

PhysicalPlan finalMainPlan = mainPlan;

try (
ComputeListener localListener = new ComputeListener(
transportService.getThreadPool(),
cancelQueryOnFailure,
finalListener.map(profiles -> {
execInfo.markEndQuery();
return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo);
})
)
) {
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
}
}

public void executePlan(
String sessionId,
CancellableTask rootTask,
PhysicalPlan physicalPlan,
Configuration configuration,
FoldContext foldContext,
EsqlExecutionInfo execInfo,
ActionListener<Result> listener,
Supplier<ExchangeSink> exchangeSinkSupplier
) {
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
physicalPlan,
Expand All @@ -185,7 +270,12 @@ public void execute(
collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks));
l.onFailure(e);
});
PhysicalPlan coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add);
PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1();

if (exchangeSinkSupplier == null) {
coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add);
}

PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2();
if (dataNodePlan != null && dataNodePlan instanceof ExchangeSinkExec == false) {
assert false : "expected data node plan starts with an ExchangeSink; got " + dataNodePlan;
Expand All @@ -211,7 +301,7 @@ public void execute(
configuration,
foldContext,
null,
null
exchangeSinkSupplier
);
updateShardCountForCoordinatorOnlyQuery(execInfo);
try (
Expand Down Expand Up @@ -287,7 +377,7 @@ public void execute(
configuration,
foldContext,
exchangeSource::createExchangeSource,
null
exchangeSinkSupplier
),
coordinatorPlan,
localListener.acquireCompute()
Expand Down
Loading