@@ -461,6 +461,9 @@ var (
461461 cannotDistributeVectorSearchErr = newQueryNotSupportedError (
462462 "vector search operation cannot be distributed" ,
463463 )
464+ cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError (
465+ "system column (that requires MVCC decoding) is requested when writes have been buffered" ,
466+ )
464467)
465468
466469// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
@@ -546,6 +549,7 @@ func checkSupportForPlanNode(
546549 node planNode ,
547550 distSQLVisitor * distSQLExprCheckVisitor ,
548551 sd * sessiondata.SessionData ,
552+ txnHasBufferedWrites bool ,
549553) (retRec distRecommendation , retErr error ) {
550554 if buildutil .CrdbTestBuild {
551555 defer func () {
@@ -563,19 +567,19 @@ func checkSupportForPlanNode(
563567 return shouldDistribute , nil
564568
565569 case * distinctNode :
566- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
570+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
567571
568572 case * exportNode :
569- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
573+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
570574
571575 case * filterNode :
572576 if err := checkExprForDistSQL (n .filter , distSQLVisitor ); err != nil {
573577 return cannotDistribute , err
574578 }
575- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
579+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
576580
577581 case * groupNode :
578- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
582+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
579583 if err != nil {
580584 return cannotDistribute , err
581585 }
@@ -604,10 +608,14 @@ func checkSupportForPlanNode(
604608 // TODO(nvanbenschoten): lift this restriction.
605609 return cannotDistribute , cannotDistributeRowLevelLockingErr
606610 }
607- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
611+ if txnHasBufferedWrites && n .fetch .requiresMVCCDecoding () {
612+ // TODO(#144166): relax this.
613+ return cannotDistribute , cannotDistributeSystemColumnsAndBufferedWrites
614+ }
615+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
608616
609617 case * invertedFilterNode :
610- return checkSupportForInvertedFilterNode (ctx , n , distSQLVisitor , sd )
618+ return checkSupportForInvertedFilterNode (ctx , n , distSQLVisitor , sd , txnHasBufferedWrites )
611619
612620 case * invertedJoinNode :
613621 if n .fetch .lockingStrength != descpb .ScanLockingStrength_FOR_NONE {
@@ -617,10 +625,14 @@ func checkSupportForPlanNode(
617625 // TODO(nvanbenschoten): lift this restriction.
618626 return cannotDistribute , cannotDistributeRowLevelLockingErr
619627 }
628+ if txnHasBufferedWrites && n .fetch .requiresMVCCDecoding () {
629+ // TODO(#144166): relax this.
630+ return cannotDistribute , cannotDistributeSystemColumnsAndBufferedWrites
631+ }
620632 if err := checkExprForDistSQL (n .onExpr , distSQLVisitor ); err != nil {
621633 return cannotDistribute , err
622634 }
623- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
635+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
624636 if err != nil {
625637 return cannotDistribute , err
626638 }
@@ -633,11 +645,11 @@ func checkSupportForPlanNode(
633645 if err := checkExprForDistSQL (n .pred .onCond , distSQLVisitor ); err != nil {
634646 return cannotDistribute , err
635647 }
636- recLeft , err := checkSupportForPlanNode (ctx , n .left , distSQLVisitor , sd )
648+ recLeft , err := checkSupportForPlanNode (ctx , n .left , distSQLVisitor , sd , txnHasBufferedWrites )
637649 if err != nil {
638650 return cannotDistribute , err
639651 }
640- recRight , err := checkSupportForPlanNode (ctx , n .right , distSQLVisitor , sd )
652+ recRight , err := checkSupportForPlanNode (ctx , n .right , distSQLVisitor , sd , txnHasBufferedWrites )
641653 if err != nil {
642654 return cannotDistribute , err
643655 }
@@ -658,7 +670,7 @@ func checkSupportForPlanNode(
658670 // Note that we don't need to check whether we support distribution of
659671 // n.countExpr or n.offsetExpr because those expressions are evaluated
660672 // locally, during the physical planning.
661- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
673+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
662674
663675 case * lookupJoinNode :
664676 if n .remoteLookupExpr != nil || n .remoteOnlyLookups {
@@ -672,7 +684,10 @@ func checkSupportForPlanNode(
672684 // TODO(nvanbenschoten): lift this restriction.
673685 return cannotDistribute , cannotDistributeRowLevelLockingErr
674686 }
675-
687+ if txnHasBufferedWrites && n .fetch .requiresMVCCDecoding () {
688+ // TODO(#144166): relax this.
689+ return cannotDistribute , cannotDistributeSystemColumnsAndBufferedWrites
690+ }
676691 if err := checkExprForDistSQL (n .lookupExpr , distSQLVisitor ); err != nil {
677692 return cannotDistribute , err
678693 }
@@ -682,7 +697,7 @@ func checkSupportForPlanNode(
682697 if err := checkExprForDistSQL (n .onCond , distSQLVisitor ); err != nil {
683698 return cannotDistribute , err
684699 }
685- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
700+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
686701 if err != nil {
687702 return cannotDistribute , err
688703 }
@@ -699,15 +714,15 @@ func checkSupportForPlanNode(
699714 return cannotDistribute , err
700715 }
701716 }
702- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
717+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
703718
704719 case * renderNode :
705720 for _ , e := range n .render {
706721 if err := checkExprForDistSQL (e , distSQLVisitor ); err != nil {
707722 return cannotDistribute , err
708723 }
709724 }
710- return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
725+ return checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
711726
712727 case * scanNode :
713728 if n .lockingStrength != descpb .ScanLockingStrength_FOR_NONE {
@@ -721,6 +736,10 @@ func checkSupportForPlanNode(
721736 // This is a locality optimized scan.
722737 return cannotDistribute , localityOptimizedOpNotDistributableErr
723738 }
739+ if txnHasBufferedWrites && n .fetchPlanningInfo .requiresMVCCDecoding () {
740+ // TODO(#144166): relax this.
741+ return cannotDistribute , cannotDistributeSystemColumnsAndBufferedWrites
742+ }
724743 scanRec := canDistribute
725744 if n .estimatedRowCount != 0 {
726745 var suffix string
@@ -748,7 +767,7 @@ func checkSupportForPlanNode(
748767 return scanRec , nil
749768
750769 case * sortNode :
751- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
770+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
752771 if err != nil {
753772 return cannotDistribute , err
754773 }
@@ -765,7 +784,7 @@ func checkSupportForPlanNode(
765784 return rec .compose (sortRec ), nil
766785
767786 case * topKNode :
768- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
787+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
769788 if err != nil {
770789 return cannotDistribute , err
771790 }
@@ -785,11 +804,11 @@ func checkSupportForPlanNode(
785804 return canDistribute , nil
786805
787806 case * unionNode :
788- recLeft , err := checkSupportForPlanNode (ctx , n .left , distSQLVisitor , sd )
807+ recLeft , err := checkSupportForPlanNode (ctx , n .left , distSQLVisitor , sd , txnHasBufferedWrites )
789808 if err != nil {
790809 return cannotDistribute , err
791810 }
792- recRight , err := checkSupportForPlanNode (ctx , n .right , distSQLVisitor , sd )
811+ recRight , err := checkSupportForPlanNode (ctx , n .right , distSQLVisitor , sd , txnHasBufferedWrites )
793812 if err != nil {
794813 return cannotDistribute , err
795814 }
@@ -819,7 +838,7 @@ func checkSupportForPlanNode(
819838 return cannotDistribute , cannotDistributeVectorSearchErr
820839
821840 case * windowNode :
822- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
841+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
823842 if err != nil {
824843 return cannotDistribute , err
825844 }
@@ -845,6 +864,10 @@ func checkSupportForPlanNode(
845864 // TODO(nvanbenschoten): lift this restriction.
846865 return cannotDistribute , cannotDistributeRowLevelLockingErr
847866 }
867+ if txnHasBufferedWrites && side .fetch .requiresMVCCDecoding () {
868+ // TODO(#144166): relax this.
869+ return cannotDistribute , cannotDistributeSystemColumnsAndBufferedWrites
870+ }
848871 }
849872 if err := checkExprForDistSQL (n .onCond , distSQLVisitor ); err != nil {
850873 return cannotDistribute , err
@@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode(
864887 n * invertedFilterNode ,
865888 distSQLVisitor * distSQLExprCheckVisitor ,
866889 sd * sessiondata.SessionData ,
890+ txnHasBufferedWrites bool ,
867891) (distRecommendation , error ) {
868- rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd )
892+ rec , err := checkSupportForPlanNode (ctx , n .input , distSQLVisitor , sd , txnHasBufferedWrites )
869893 if err != nil {
870894 return cannotDistribute , err
871895 }
@@ -5399,7 +5423,16 @@ func checkScanParallelizationIfLocal(
53995423 if len (n .reqOrdering ) == 0 && n .parallelize {
54005424 hasScanNodeToParallelize = true
54015425 }
5402- case * distinctNode , * explainVecNode , * indexJoinNode , * limitNode ,
5426+ if n .fetchPlanningInfo .requiresMVCCDecoding () {
5427+ prohibitParallelization = true
5428+ return
5429+ }
5430+ case * indexJoinNode :
5431+ if n .fetch .requiresMVCCDecoding () {
5432+ prohibitParallelization = true
5433+ return
5434+ }
5435+ case * distinctNode , * explainVecNode , * limitNode ,
54035436 * ordinalityNode , * sortNode , * unionNode , * valuesNode :
54045437 default :
54055438 prohibitParallelization = true
0 commit comments