19
19
import org .elasticsearch .common .util .concurrent .RunOnce ;
20
20
import org .elasticsearch .compute .data .BlockFactory ;
21
21
import org .elasticsearch .compute .data .Page ;
22
+ import org .elasticsearch .compute .operator .Driver ;
22
23
import org .elasticsearch .compute .operator .DriverCompletionInfo ;
23
24
import org .elasticsearch .compute .operator .DriverTaskRunner ;
24
25
import org .elasticsearch .compute .operator .FailureCollector ;
@@ -658,31 +659,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
658
659
throw new IllegalStateException ("no drivers created" );
659
660
}
660
661
LOGGER .debug ("using {} drivers" , drivers .size ());
661
- ActionListener <Void > driverListener = listener .map (ignored -> {
662
- if (LOGGER .isDebugEnabled ()) {
663
- LOGGER .debug (
664
- "finished {}" ,
665
- DriverCompletionInfo .includingProfiles (
666
- drivers ,
667
- context .description (),
668
- clusterService .getClusterName ().value (),
669
- transportService .getLocalNode ().getName (),
670
- localPlan .toString ()
671
- )
672
- );
673
- }
674
- if (context .configuration ().profile ()) {
675
- return DriverCompletionInfo .includingProfiles (
676
- drivers ,
677
- context .description (),
678
- clusterService .getClusterName ().value (),
679
- transportService .getLocalNode ().getName (),
680
- localPlan .toString ()
681
- );
682
- } else {
683
- return DriverCompletionInfo .excludingProfiles (drivers );
684
- }
685
- });
662
+ ActionListener <Void > driverListener = addCompletionInfo (listener , drivers , context , localPlan );
686
663
driverRunner .executeDrivers (
687
664
task ,
688
665
drivers ,
@@ -694,6 +671,50 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
694
671
}
695
672
}
696
673
674
+ ActionListener <Void > addCompletionInfo (
675
+ ActionListener <DriverCompletionInfo > listener ,
676
+ List <Driver > drivers ,
677
+ ComputeContext context ,
678
+ PhysicalPlan localPlan
679
+ ) {
680
+ /*
681
+ * We *really* don't want to close over the localPlan because it can
682
+ * be quite large, and it isn't tracked.
683
+ */
684
+ boolean needPlanString = LOGGER .isDebugEnabled () || context .configuration ().profile ();
685
+ String planString = needPlanString ? localPlan .toString () : null ;
686
+ return listener .map (ignored -> {
687
+ if (LOGGER .isDebugEnabled ()) {
688
+ LOGGER .debug (
689
+ "finished {}" ,
690
+ DriverCompletionInfo .includingProfiles (
691
+ drivers ,
692
+ context .description (),
693
+ clusterService .getClusterName ().value (),
694
+ transportService .getLocalNode ().getName (),
695
+ planString
696
+ )
697
+ );
698
+ /*
699
+ * planString *might* be null if we *just* set DEBUG to *after*
700
+ * we built the listener but before we got here. That's something
701
+ * we can live with.
702
+ */
703
+ }
704
+ if (context .configuration ().profile ()) {
705
+ return DriverCompletionInfo .includingProfiles (
706
+ drivers ,
707
+ context .description (),
708
+ clusterService .getClusterName ().value (),
709
+ transportService .getLocalNode ().getName (),
710
+ planString
711
+ );
712
+ } else {
713
+ return DriverCompletionInfo .excludingProfiles (drivers );
714
+ }
715
+ });
716
+ }
717
+
697
718
static PhysicalPlan reductionPlan (ExchangeSinkExec plan , boolean enable ) {
698
719
PhysicalPlan reducePlan = new ExchangeSourceExec (plan .source (), plan .output (), plan .isIntermediateAgg ());
699
720
if (enable ) {
0 commit comments