@@ -636,19 +636,14 @@ private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistribute
636636
637637 List <RecursiveCteNode > recursiveCteNodes = planFragment .getPlanRoot ()
638638 .collectInCurrentFragment (RecursiveCteNode .class ::isInstance );
639- if (!recursiveCteNodes .isEmpty ()) {
640- if (recursiveCteNodes .size () != 1 ) {
641- throw new IllegalStateException (
642- String .format ("one fragment can only have 1 recursive cte node, but there is %d" ,
643- recursiveCteNodes .size ()));
644- }
645-
639+ for (RecursiveCteNode recursiveCteNode : recursiveCteNodes ) {
646640 List <TRecCTETarget > targets = new ArrayList <>();
647641 List <TRecCTEResetInfo > fragmentsToReset = new ArrayList <>();
648- // PhysicalPlanTranslator will swap recursiveCteNode's child fragment,
649- // so we get recursive one by 1st child and collect all child fragment of recursive side
642+ // recursiveCteNode's right child is recursive part (exchange node)
643+ // so we get collect all child fragment from exchange node's child node
650644 List <PlanFragment > childFragments = new ArrayList <>();
651- planFragment .getChild (0 ).collectAll (PlanFragment .class ::isInstance , childFragments );
645+ recursiveCteNode .getChild (1 ).getChild (0 ).getFragment ().collectAll (PlanFragment .class ::isInstance ,
646+ childFragments );
652647 for (PlanFragment child : childFragments ) {
653648 PlanFragmentId childFragmentId = child .getFragmentId ();
654649 // the fragment need to be notified to close
@@ -676,7 +671,6 @@ private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistribute
676671 }
677672 }
678673
679- RecursiveCteNode recursiveCteNode = recursiveCteNodes .get (0 );
680674 List <List <Expr >> materializedResultExprLists = recursiveCteNode .getMaterializedResultExprLists ();
681675 List <List <TExpr >> texprLists = new ArrayList <>(materializedResultExprLists .size ());
682676 for (List <Expr > exprList : materializedResultExprLists ) {
0 commit comments