@@ -195,7 +195,7 @@ public void execute(
195195
196196 // we have no sub plans, so we can just execute the given plan
197197 if (subplans == null || subplans .size () == 0 ) {
198- executePlan (sessionId , rootTask , physicalPlan , configuration , foldContext , execInfo , listener , null );
198+ executePlan (sessionId , rootTask , physicalPlan , configuration , foldContext , execInfo , null , listener , null );
199199 return ;
200200 }
201201
@@ -220,7 +220,7 @@ public void execute(
220220 var finalListener = ActionListener .runBefore (listener , () -> exchangeService .removeExchangeSourceHandler (sessionId ));
221221 var computeContext = new ComputeContext (
222222 mainSessionId ,
223- "single " ,
223+ "main.final " ,
224224 LOCAL_CLUSTER ,
225225 List .of (),
226226 configuration ,
@@ -244,22 +244,33 @@ public void execute(
244244 ) {
245245 runCompute (rootTask , computeContext , finalMainPlan , localListener .acquireCompute ());
246246
247- for (PhysicalPlan subplan : subplans ) {
247+ for (int i = 0 ; i < subplans .size (); i ++) {
248+ var subplan = subplans .get (i );
248249 var childSessionId = newChildSession (sessionId );
249250 ExchangeSinkHandler exchangeSink = exchangeService .createSinkHandler (childSessionId , queryPragmas .exchangeBufferSize ());
250251 // funnel sub plan pages into the main plan exchange source
251252 mainExchangeSource .addRemoteSink (exchangeSink ::fetchPageAsync , true , () -> {}, 1 , ActionListener .noop ());
252253 var subPlanListener = localListener .acquireCompute ();
253254
254- executePlan (childSessionId , rootTask , subplan , configuration , foldContext , execInfo , ActionListener .wrap (result -> {
255- exchangeSink .addCompletionListener (
256- ActionListener .running (() -> { exchangeService .finishSinkHandler (childSessionId , null ); })
257- );
258- subPlanListener .onResponse (result .completionInfo ());
259- }, e -> {
260- exchangeService .finishSinkHandler (childSessionId , e );
261- subPlanListener .onFailure (e );
262- }), () -> exchangeSink .createExchangeSink (() -> {}));
255+ executePlan (
256+ childSessionId ,
257+ rootTask ,
258+ subplan ,
259+ configuration ,
260+ foldContext ,
261+ execInfo ,
262+ "subplan-" + i ,
263+ ActionListener .wrap (result -> {
264+ exchangeSink .addCompletionListener (
265+ ActionListener .running (() -> { exchangeService .finishSinkHandler (childSessionId , null ); })
266+ );
267+ subPlanListener .onResponse (result .completionInfo ());
268+ }, e -> {
269+ exchangeService .finishSinkHandler (childSessionId , e );
270+ subPlanListener .onFailure (e );
271+ }),
272+ () -> exchangeSink .createExchangeSink (() -> {})
273+ );
263274 }
264275 }
265276 }
@@ -272,6 +283,7 @@ public void executePlan(
272283 Configuration configuration ,
273284 FoldContext foldContext ,
274285 EsqlExecutionInfo execInfo ,
286+ String profileQualifier ,
275287 ActionListener <Result > listener ,
276288 Supplier <ExchangeSink > exchangeSinkSupplier
277289 ) {
@@ -309,7 +321,7 @@ public void executePlan(
309321 }
310322 var computeContext = new ComputeContext (
311323 newChildSession (sessionId ),
312- "single" ,
324+ profileDescription ( profileQualifier , "single" ) ,
313325 LOCAL_CLUSTER ,
314326 List .of (),
315327 configuration ,
@@ -395,7 +407,7 @@ public void executePlan(
395407 rootTask ,
396408 new ComputeContext (
397409 sessionId ,
398- "final" ,
410+ profileDescription ( profileQualifier , "final" ) ,
399411 LOCAL_CLUSTER ,
400412 List .of (),
401413 configuration ,
@@ -611,6 +623,10 @@ String newChildSession(String session) {
611623 return session + "/" + childSessionIdGenerator .incrementAndGet ();
612624 }
613625
626+ String profileDescription (String qualifier , String label ) {
627+ return qualifier == null ? label : qualifier + "." + label ;
628+ }
629+
614630 Runnable cancelQueryOnFailure (CancellableTask task ) {
615631 return new RunOnce (() -> {
616632 LOGGER .debug ("cancelling ESQL task {} on failure" , task );
0 commit comments