Skip to content

Commit ac3f7d4

Browse files
committed
Make LocalLogicalPlanOptimizer async
1 parent e723e5c commit ac3f7d4

File tree

7 files changed

+174
-110
lines changed

7 files changed

+174
-110
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
1112
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
1213
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch;
@@ -80,7 +81,11 @@ private static Batch<LogicalPlan> localOperators() {
8081
return operators.with(newRules.toArray(Rule[]::new));
8182
}
8283

83-
public LogicalPlan localOptimize(LogicalPlan plan) {
84-
return execute(plan);
84+
// public LogicalPlan localOptimize(LogicalPlan plan) {
85+
// return execute(plan);
86+
// }
87+
88+
public void localOptimize(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
89+
execute(plan, listener);
8590
}
8691
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.planner;
99

1010
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1314
import org.elasticsearch.common.util.BigArrays;
@@ -162,52 +163,58 @@ private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> acti
162163
}));
163164
}
164165

165-
public static PhysicalPlan localPlan(
166+
public static void localPlan(
166167
List<SearchExecutionContext> searchContexts,
167168
Configuration configuration,
168169
FoldContext foldCtx,
169-
PhysicalPlan plan
170+
PhysicalPlan plan,
171+
ActionListener<PhysicalPlan> listener
170172
) {
171-
return localPlan(configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
173+
localPlan(configuration, foldCtx, plan, SearchContextStats.from(searchContexts), listener);
172174
}
173175

174-
public static PhysicalPlan localPlan(Configuration configuration, FoldContext foldCtx, PhysicalPlan plan, SearchStats searchStats) {
176+
public static void localPlan(
177+
Configuration configuration,
178+
FoldContext foldCtx,
179+
PhysicalPlan plan,
180+
SearchStats searchStats,
181+
ActionListener<PhysicalPlan> listener
182+
) {
175183
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
176184
var physicalOptimizer = new LocalPhysicalPlanOptimizer(new LocalPhysicalOptimizerContext(configuration, foldCtx, searchStats));
177185

178-
return localPlan(plan, logicalOptimizer, physicalOptimizer);
186+
localPlan(plan, logicalOptimizer, physicalOptimizer, listener);
179187
}
180188

181-
public static PhysicalPlan localPlan(
189+
public static void localPlan(
182190
PhysicalPlan plan,
183191
LocalLogicalPlanOptimizer logicalOptimizer,
184-
LocalPhysicalPlanOptimizer physicalOptimizer
192+
LocalPhysicalPlanOptimizer physicalOptimizer,
193+
ActionListener<PhysicalPlan> listener
185194
) {
186195
final LocalMapper localMapper = new LocalMapper();
187-
var isCoordPlan = new Holder<>(Boolean.TRUE);
188196

189-
var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
190-
isCoordPlan.set(Boolean.FALSE);
191-
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
192-
var physicalFragment = localMapper.map(optimizedFragment);
193-
var filter = f.esFilter();
194-
if (filter != null) {
195-
physicalFragment = physicalFragment.transformUp(
196-
EsSourceExec.class,
197-
query -> new EsSourceExec(
198-
Source.EMPTY,
199-
query.indexPattern(),
200-
query.indexMode(),
201-
query.indexNameWithModes(),
202-
query.output(),
203-
filter
204-
)
205-
);
206-
}
207-
var localOptimized = physicalOptimizer.localOptimize(physicalFragment);
208-
return EstimatesRowSize.estimateRowSize(f.estimatedRowSize(), localOptimized);
209-
});
210-
return isCoordPlan.get() ? plan : localPhysicalPlan;
197+
plan.transformUp(FragmentExec.class, (f, l) -> {
198+
logicalOptimizer.localOptimize(f.fragment(), ActionListener.wrap(optimizedFragment -> {
199+
var physicalFragment = localMapper.map(optimizedFragment);
200+
var filter = f.esFilter();
201+
if (filter != null) {
202+
physicalFragment = physicalFragment.transformUp(
203+
EsSourceExec.class,
204+
query -> new EsSourceExec(
205+
Source.EMPTY,
206+
query.indexPattern(),
207+
query.indexMode(),
208+
query.indexNameWithModes(),
209+
query.output(),
210+
filter
211+
)
212+
);
213+
}
214+
var localOptimized = physicalOptimizer.localOptimize(physicalFragment);
215+
l.onResponse(EstimatesRowSize.estimateRowSize(f.estimatedRowSize(), localOptimized));
216+
}, l::onFailure));
217+
}, listener);
211218
}
212219

213220
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -576,39 +576,51 @@ public SourceProvider createSourceProvider() {
576576

577577
LOGGER.debug("Received physical plan:\n{}", plan);
578578

579-
var localPlan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
580-
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
581-
// it's doing this in the planning of EsQueryExec (the source of the data)
582-
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
583-
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
584-
if (LOGGER.isDebugEnabled()) {
585-
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
586-
}
587-
var drivers = localExecutionPlan.createDrivers(context.sessionId());
588-
// After creating the drivers (and therefore, the operators), we can safely decrement the reference count since the operators
589-
// will hold a reference to the contexts where relevant.
590-
contexts.forEach(RefCounted::decRef);
591-
if (drivers.isEmpty()) {
592-
throw new IllegalStateException("no drivers created");
593-
}
594-
LOGGER.debug("using {} drivers", drivers.size());
595-
driverRunner.executeDrivers(
596-
task,
597-
drivers,
598-
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
599-
ActionListener.releaseAfter(listener.map(ignored -> {
600-
if (context.configuration().profile()) {
601-
return DriverCompletionInfo.includingProfiles(
602-
drivers,
603-
context.description(),
604-
clusterService.getClusterName().value(),
605-
transportService.getLocalNode().getName(),
606-
localPlan.toString()
607-
);
608-
} else {
609-
return DriverCompletionInfo.excludingProfiles(drivers);
579+
PlannerUtils.localPlan(
580+
context.searchExecutionContexts(),
581+
context.configuration(),
582+
context.foldCtx(),
583+
plan,
584+
listener.delegateFailureAndWrap((l, localPlan) -> {
585+
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
586+
// it's doing this in the planning of EsQueryExec (the source of the data)
587+
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
588+
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(
589+
context.description(),
590+
context.foldCtx(),
591+
localPlan
592+
);
593+
if (LOGGER.isDebugEnabled()) {
594+
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
610595
}
611-
}), () -> Releasables.close(drivers))
596+
var drivers = localExecutionPlan.createDrivers(context.sessionId());
597+
// After creating the drivers (and therefore, the operators), we can safely decrement the reference count since the
598+
// operators
599+
// will hold a reference to the contexts where relevant.
600+
contexts.forEach(RefCounted::decRef);
601+
if (drivers.isEmpty()) {
602+
throw new IllegalStateException("no drivers created");
603+
}
604+
LOGGER.debug("using {} drivers", drivers.size());
605+
driverRunner.executeDrivers(
606+
task,
607+
drivers,
608+
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
609+
ActionListener.releaseAfter(l.map(ignored -> {
610+
if (context.configuration().profile()) {
611+
return DriverCompletionInfo.includingProfiles(
612+
drivers,
613+
context.description(),
614+
clusterService.getClusterName().value(),
615+
transportService.getLocalNode().getName(),
616+
localPlan.toString()
617+
);
618+
} else {
619+
return DriverCompletionInfo.excludingProfiles(drivers);
620+
}
621+
}), () -> Releasables.close(drivers))
622+
);
623+
})
612624
);
613625
} catch (Exception e) {
614626
listener.onFailure(e);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -707,21 +707,58 @@ void executeSubPlan(
707707
List<Page> collectedPages = Collections.synchronizedList(new ArrayList<>());
708708

709709
// replace fragment inside the coordinator plan
710-
List<Driver> drivers = new ArrayList<>();
711710
LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(
712711
"final",
713712
foldCtx,
714713
new OutputExec(coordinatorPlan, collectedPages::add)
715714
);
716-
drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName()));
717-
if (dataNodePlan != null) {
718-
var searchStats = new DisabledSearchStats();
719-
var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
720-
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(
721-
new LocalPhysicalOptimizerContext(configuration, foldCtx, searchStats)
722-
);
723715

724-
var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer);
716+
addDataNodePlan(
717+
coordinatorNodeExecutionPlan.createDrivers(getTestName()),
718+
dataNodePlan,
719+
foldCtx,
720+
exchangeSource,
721+
exchangeSink,
722+
executionPlanner,
723+
listener.delegateFailureAndWrap((l, drivers) -> {
724+
DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) {
725+
@Override
726+
protected void start(Driver driver, ActionListener<Void> driverListener) {
727+
Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener);
728+
}
729+
};
730+
731+
// Execute the drivers
732+
l = ActionListener.releaseAfter(l, () -> Releasables.close(drivers));
733+
runner.runToCompletion(
734+
drivers,
735+
l.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null))
736+
);
737+
})
738+
);
739+
740+
}
741+
742+
private void addDataNodePlan(
743+
List<Driver> drivers,
744+
PhysicalPlan dataNodePlan,
745+
FoldContext foldCtx,
746+
ExchangeSourceHandler exchangeSource,
747+
ExchangeSinkHandler exchangeSink,
748+
LocalExecutionPlanner executionPlanner,
749+
ActionListener<List<Driver>> listener
750+
) {
751+
if (dataNodePlan == null) {
752+
listener.onResponse(drivers);
753+
return;
754+
}
755+
756+
var searchStats = new DisabledSearchStats();
757+
var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
758+
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(
759+
new LocalPhysicalOptimizerContext(configuration, foldCtx, searchStats)
760+
);
761+
PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer, listener.map(csvDataNodePhysicalPlan -> {
725762
exchangeSource.addRemoteSink(
726763
exchangeSink::fetchPageAsync,
727764
Randomness.get().nextBoolean(),
@@ -732,21 +769,9 @@ void executeSubPlan(
732769
})
733770
);
734771
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan);
735-
736772
drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName()));
737773
Randomness.shuffle(drivers);
738-
}
739-
// Execute the drivers
740-
DriverRunner runner = new DriverRunner(threadPool.getThreadContext()) {
741-
@Override
742-
protected void start(Driver driver, ActionListener<Void> driverListener) {
743-
Driver.start(threadPool.getThreadContext(), executor, driver, between(1, 1000), driverListener);
744-
}
745-
};
746-
listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers));
747-
runner.runToCompletion(
748-
drivers,
749-
listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null))
750-
);
774+
return drivers;
775+
}));
751776
}
752777
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.action.support.ListenableActionFuture;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.util.Maps;
1314
import org.elasticsearch.index.IndexMode;
@@ -512,9 +513,10 @@ public void testSparseDocument() throws Exception {
512513
var analyzed = analyze(analyzer, parser.createStatement(query));
513514
var optimized = logicalOptimizer.optimize(analyzed);
514515
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), searchStats);
515-
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);
516+
ListenableActionFuture<LogicalPlan> localPlanListener = new ListenableActionFuture<>();
517+
new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized, localPlanListener);
516518

517-
var project = as(plan, Project.class);
519+
var project = as(localPlanListener.actionResult(), Project.class);
518520
assertThat(project.projections(), hasSize(10));
519521
assertThat(
520522
Expressions.names(project.projections()),
@@ -800,9 +802,10 @@ private LogicalPlan plan(String query) {
800802
private LogicalPlan localPlan(LogicalPlan plan, SearchStats searchStats) {
801803
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), searchStats);
802804
// System.out.println(plan);
803-
var localPlan = new LocalLogicalPlanOptimizer(localContext).localOptimize(plan);
805+
ListenableActionFuture<LogicalPlan> localPlan = new ListenableActionFuture<>();
806+
new LocalLogicalPlanOptimizer(localContext).localOptimize(plan, localPlan);
804807
// System.out.println(localPlan);
805-
return localPlan;
808+
return localPlan.actionResult();
806809
}
807810

808811
private LogicalPlan localPlan(String query) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.Build;
14+
import org.elasticsearch.action.support.ListenableActionFuture;
1415
import org.elasticsearch.common.geo.ShapeRelation;
1516
import org.elasticsearch.common.lucene.BytesRefs;
1617
import org.elasticsearch.common.settings.Settings;
@@ -7842,7 +7843,8 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
78427843
// The TopN needs an estimated row size for the planner to work
78437844
var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config);
78447845
plan = useDataNodePlan ? plans.v2() : plans.v1();
7845-
plan = PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS);
7846+
ListenableActionFuture<PhysicalPlan> localPlanFuture = new ListenableActionFuture<>();
7847+
PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS, localPlanFuture);
78467848
ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10);
78477849
LocalExecutionPlanner planner = new LocalExecutionPlanner(
78487850
"test",
@@ -7861,7 +7863,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
78617863
List.of()
78627864
);
78637865

7864-
return planner.plan("test", FoldContext.small(), plan);
7866+
return planner.plan("test", FoldContext.small(), localPlanFuture.actionResult());
78657867
}
78667868

78677869
private List<Set<String>> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) {
@@ -8208,15 +8210,22 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) {
82088210
// this is of no use in the unit tests, which checks the plan as a whole instead of each
82098211
// individually hence why here the plan is kept as is
82108212

8211-
var l = p.transformUp(FragmentExec.class, fragment -> {
8212-
var localPlan = PlannerUtils.localPlan(config, FoldContext.small(), fragment, searchStats);
8213-
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan);
8214-
});
8213+
ListenableActionFuture<PhysicalPlan> l = new ListenableActionFuture<>();
82158214

8216-
// handle local reduction alignment
8217-
l = localRelationshipAlignment(l);
8218-
// System.out.println("* Localized DataNode Plan\n" + l);
8219-
return l;
8215+
p.transformUp(
8216+
FragmentExec.class,
8217+
(fragment, listener) -> PlannerUtils.localPlan(
8218+
config,
8219+
FoldContext.small(),
8220+
fragment,
8221+
searchStats,
8222+
listener.map(localPlan -> EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan))
8223+
),
8224+
// handle local reduction alignment
8225+
l.map(PhysicalPlanOptimizerTests::localRelationshipAlignment)
8226+
);
8227+
8228+
return l.actionResult();
82208229
}
82218230

82228231
static SearchStats statsWithIndexedFields(String... names) {

0 commit comments

Comments
 (0)