1515import org .elasticsearch .action .search .ShardSearchFailure ;
1616import org .elasticsearch .action .support .SubscribableListener ;
1717import org .elasticsearch .common .collect .Iterators ;
18+ import org .elasticsearch .common .unit .ByteSizeValue ;
1819import org .elasticsearch .compute .data .Block ;
20+ import org .elasticsearch .compute .data .BlockFactory ;
1921import org .elasticsearch .compute .data .BlockUtils ;
2022import org .elasticsearch .compute .data .Page ;
2123import org .elasticsearch .compute .operator .DriverCompletionInfo ;
8486import java .util .List ;
8587import java .util .Map ;
8688import java .util .Set ;
89+ import java .util .concurrent .atomic .AtomicReference ;
8790
8891import static java .util .stream .Collectors .joining ;
8992import static java .util .stream .Collectors .toSet ;
9093import static org .elasticsearch .index .query .QueryBuilders .boolQuery ;
9194import static org .elasticsearch .xpack .esql .core .tree .Source .EMPTY ;
9295import static org .elasticsearch .xpack .esql .plan .logical .join .InlineJoin .firstSubPlan ;
96+ import static org .elasticsearch .xpack .esql .session .SessionUtils .checkPagesBelowSize ;
9397
9498public class EsqlSession {
9599
@@ -104,6 +108,9 @@ public interface PlanRunner {
104108 }
105109
106110 private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion .fromName ("lookup_join_ccs" );
111+ private static final double INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE = .1 ;
112+ private static final long INTERMEDIATE_LOCAL_RELATION_MIN_SIZE = ByteSizeValue .ofMb (1 ).getBytes ();
113+ private static final long INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = ByteSizeValue .ofMb (30 ).getBytes ();
107114
108115 private final String sessionId ;
109116 private final Configuration configuration ;
@@ -123,6 +130,8 @@ public interface PlanRunner {
123130 private final IndicesExpressionGrouper indicesExpressionGrouper ;
124131 private final InferenceService inferenceService ;
125132 private final RemoteClusterService remoteClusterService ;
133+ private final BlockFactory blockFactory ;
134+ private final long maxIntermediateLocalRelationSize ;
126135
127136 private boolean explainMode ;
128137 private String parsedPlanString ;
@@ -159,6 +168,8 @@ public EsqlSession(
159168 this .inferenceService = services .inferenceService ();
160169 this .preMapper = new PreMapper (services );
161170 this .remoteClusterService = services .transportService ().getRemoteClusterService ();
171+ this .blockFactory = services .blockFactoryProvider ().blockFactory ();
172+ maxIntermediateLocalRelationSize = maxIntermediateLocalRelationSize (blockFactory );
162173 }
163174
164175 public String sessionId () {
@@ -279,10 +290,13 @@ private void executeSubPlan(
279290 executionInfo .startSubPlans ();
280291
281292 runner .run (physicalSubPlan , listener .delegateFailureAndWrap ((next , result ) -> {
293+ AtomicReference <Block []> localRelationBlocks = new AtomicReference <>();
282294 try {
283295 // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
284296 completionInfoAccumulator .accumulate (result .completionInfo ());
285- LocalRelation resultWrapper = resultToPlan (subPlans .stubReplacedSubPlan (), result );
297+ LocalRelation resultWrapper = resultToPlan (subPlans .stubReplacedSubPlan ().source (), result );
298+ localRelationBlocks .set (resultWrapper .supplier ().get ());
299+ var releasingNext = ActionListener .runAfter (next , () -> releaseLocalRelationBlocks (localRelationBlocks ));
286300
287301 // replace the original logical plan with the backing result
288302 LogicalPlan newLogicalPlan = optimizedPlan .transformUp (
@@ -301,27 +315,54 @@ private void executeSubPlan(
301315 executionInfo .finishSubPlans ();
302316 LOGGER .debug ("Executing final plan:\n {}" , newLogicalPlan );
303317 var newPhysicalPlan = logicalPlanToPhysicalPlan (newLogicalPlan , request );
304- runner .run (newPhysicalPlan , next .delegateFailureAndWrap ((finalListener , finalResult ) -> {
318+ runner .run (newPhysicalPlan , releasingNext .delegateFailureAndWrap ((finalListener , finalResult ) -> {
305319 completionInfoAccumulator .accumulate (finalResult .completionInfo ());
306320 finalListener .onResponse (
307321 new Result (finalResult .schema (), finalResult .pages (), completionInfoAccumulator .finish (), executionInfo )
308322 );
309323 }));
310324 } else {// continue executing the subplans
311- executeSubPlan (completionInfoAccumulator , newLogicalPlan , newSubPlan , executionInfo , runner , request , listener );
325+ executeSubPlan (completionInfoAccumulator , newLogicalPlan , newSubPlan , executionInfo , runner , request , releasingNext );
312326 }
327+ } catch (Exception e ) {
328+ // safely release the blocks in case an exception occurs either before, but also after the "final" runner.run() forks off
329+ // the current thread, but with the blocks still referenced
330+ releaseLocalRelationBlocks (localRelationBlocks );
331+ throw e ;
313332 } finally {
314333 Releasables .closeExpectNoException (Releasables .wrap (Iterators .map (result .pages ().iterator (), p -> p ::releaseBlocks )));
315334 }
316335 }));
317336 }
318337
319- private static LocalRelation resultToPlan (LogicalPlan plan , Result result ) {
338+ private LocalRelation resultToPlan (Source planSource , Result result ) {
320339 List <Page > pages = result .pages ();
340+ checkPagesBelowSize (
341+ pages ,
342+ maxIntermediateLocalRelationSize ,
343+ (actual ) -> "sub-plan execution results too large ["
344+ + ByteSizeValue .ofBytes (actual )
345+ + "] > "
346+ + ByteSizeValue .ofBytes (maxIntermediateLocalRelationSize )
347+ );
321348 List <Attribute > schema = result .schema ();
322- // if (pages.size() > 1) {
323- Block [] blocks = SessionUtils .fromPages (schema , pages );
324- return new LocalRelation (plan .source (), schema , LocalSupplier .of (blocks ));
349+ Block [] blocks = SessionUtils .fromPages (schema , pages , blockFactory );
350+ return new LocalRelation (planSource , schema , LocalSupplier .of (blocks ));
351+ }
352+
353+ private static void releaseLocalRelationBlocks (AtomicReference <Block []> localRelationBlocks ) {
354+ Block [] relationBlocks = localRelationBlocks .getAndSet (null );
355+ if (relationBlocks != null ) {
356+ Releasables .closeExpectNoException (relationBlocks );
357+ }
358+ }
359+
360+ // returns INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE percent of the circuit breaker limit, but at least
361+ // INTERMEDIATE_LOCAL_RELATION_MIN_SIZE and at most INTERMEDIATE_LOCAL_RELATION_MAX_SIZE
362+ static long maxIntermediateLocalRelationSize (BlockFactory blockFactory ) {
363+ long breakerLimit = blockFactory .breaker ().getLimit ();
364+ long percentageLimit = (long ) (breakerLimit * INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE / 100.d );
365+ return Math .min (Math .max (percentageLimit , INTERMEDIATE_LOCAL_RELATION_MIN_SIZE ), INTERMEDIATE_LOCAL_RELATION_MAX_SIZE );
325366 }
326367
327368 private EsqlStatement parse (String query , QueryParams params ) {
0 commit comments