Skip to content

Commit 282e772

Browse files
authored
Merge pull request #154448 from yuzefovich/backport25.3-152201
release-25.3: sql: disable DistSQL when txn buffered writes and MVCC decoding is required
2 parents 08bccf9 + 213b7f7 commit 282e772

22 files changed

+423
-82
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,13 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
16861686
return tc.hasPerformedWritesLocked()
16871687
}
16881688

1689+
// HasBufferedWrites is part of the TxnSender interface.
1690+
func (tc *TxnCoordSender) HasBufferedWrites() bool {
1691+
tc.mu.Lock()
1692+
defer tc.mu.Unlock()
1693+
return tc.hasBufferedWritesLocked()
1694+
}
1695+
16891696
// TestingShouldRetry is part of the TxnSender interface.
16901697
func (tc *TxnCoordSender) TestingShouldRetry() bool {
16911698
tc.mu.Lock()
@@ -1710,3 +1717,7 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
17101717
func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
17111718
return tc.mu.txn.Sequence != 0
17121719
}
1720+
1721+
func (tc *TxnCoordSender) hasBufferedWritesLocked() bool {
1722+
return tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites()
1723+
}

pkg/kv/mock_transactional_sender.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
282282
panic("unimplemented")
283283
}
284284

285+
// HasBufferedWrites is part of TxnSenderFactory.
286+
func (m *MockTransactionalSender) HasBufferedWrites() bool {
287+
return false
288+
}
289+
285290
// TestingShouldRetry is part of TxnSenderFactory.
286291
func (m *MockTransactionalSender) TestingShouldRetry() bool {
287292
return false

pkg/kv/sender.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,12 @@ type TxnSender interface {
134134
SetOmitInRangefeeds()
135135

136136
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
137-
// gateway node until the commit time. Only allowed on the RootTxn. Buffered
138-
// writes cannot be enabled on a txn that performed any requests. When
139-
// disabling buffered writes, if there are any writes in the buffer, they
140-
// are flushed with the next BatchRequest.
137+
// gateway node until the commit time. Buffered writes cannot be enabled on
138+
// a txn that performed any requests. When disabling buffered writes, if
139+
// there are any writes in the buffer, they are flushed with the next
140+
// BatchRequest.
141+
//
142+
// Only allowed on the RootTxn.
141143
SetBufferedWritesEnabled(bool)
142144

143145
// BufferedWritesEnabled returns whether the buffered writes are enabled.
@@ -379,6 +381,10 @@ type TxnSender interface {
379381
// transaction's current epoch.
380382
HasPerformedWrites() bool
381383

384+
// HasBufferedWrites returns true if a write has been buffered for the
385+
// transaction's current epoch.
386+
HasBufferedWrites() bool
387+
382388
// TestingShouldRetry returns true if transaction retry errors should be
383389
// randomly returned to callers. Note that it is the responsibility of
384390
// (*kv.DB).Txn() to return the retries. This lives here since the

pkg/kv/txn.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,18 @@ func (txn *Txn) debugNameLocked() string {
443443
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
444444
}
445445

446+
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
447+
// gateway node until the commit time. Buffered writes cannot be enabled on a
448+
// txn that performed any requests. When disabling buffered writes, if there are
449+
// any writes in the buffer, they are flushed with the next BatchRequest.
450+
//
451+
// Only allowed on the RootTxn.
446452
func (txn *Txn) SetBufferedWritesEnabled(enabled bool) {
447453
if txn.typ != RootTxn {
448-
panic(errors.AssertionFailedf("SetBufferedWritesEnabled() called on leaf txn"))
454+
panic(errors.AssertionFailedf(
455+
"SetBufferedWritesEnabled(%t) called on leaf txn (buffer empty? %t)",
456+
enabled, txn.HasBufferedWrites(),
457+
))
449458
}
450459

451460
txn.mu.Lock()
@@ -1862,6 +1871,14 @@ func (txn *Txn) HasPerformedWrites() bool {
18621871
return txn.mu.sender.HasPerformedWrites()
18631872
}
18641873

1874+
// HasBufferedWrites returns true if a write has been buffered for the
1875+
// transaction's current epoch.
1876+
func (txn *Txn) HasBufferedWrites() bool {
1877+
txn.mu.Lock()
1878+
defer txn.mu.Unlock()
1879+
return txn.mu.sender.HasBufferedWrites()
1880+
}
1881+
18651882
// AdmissionHeader returns the admission header for work done in the context
18661883
// of this transaction.
18671884
func (txn *Txn) AdmissionHeader() kvpb.AdmissionHeader {

pkg/sql/apply_join.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,7 @@ func runPlanInsidePlan(
315315
}
316316
}
317317

318-
distributePlan, distSQLProhibitedErr := getPlanDistribution(
319-
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
320-
plannerCopy.SessionData(), plan.main, &plannerCopy.distSQLVisitor,
321-
)
318+
distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main)
322319
distributeType := DistributionType(LocalDistribution)
323320
if distributePlan.WillDistribute() {
324321
distributeType = FullDistribution

pkg/sql/colfetcher/cfetcher.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,15 @@ func (cf *cFetcher) Init(
447447
// MVCC decoding.
448448
if cf.mvccDecodeStrategy == storage.MVCCDecodingRequired {
449449
if cf.txn != nil && cf.txn.BufferedWritesEnabled() {
450+
if cf.txn.Type() == kv.LeafTxn {
451+
// We're only allowed to disable buffered writes on the RootTxn.
452+
// If we have a LeafTxn, we'll return an assertion error instead
453+
// of crashing.
454+
//
455+
// Note that we might have a LeafTxn with no buffered writes, in
456+
// which case BufferedWritesEnabled() is false.
457+
return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required")
458+
}
450459
cf.txn.SetBufferedWritesEnabled(false /* enabled */)
451460
}
452461
}

pkg/sql/conn_executor_exec.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3028,10 +3028,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
30283028
}
30293029
}
30303030
}
3031-
distributePlan, distSQLProhibitedErr := getPlanDistribution(
3032-
ctx, planner.Descriptors().HasUncommittedTypes(),
3033-
ex.sessionData(), planner.curPlan.main, &planner.distSQLVisitor,
3034-
)
3031+
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
30353032
if afterGetPlanDistribution != nil {
30363033
afterGetPlanDistribution()
30373034
}

pkg/sql/delete_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
8989
// fetch kvs.
9090
var spec fetchpb.IndexFetchSpec
9191
if err := rowenc.InitIndexFetchSpec(
92-
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* columnIDs */
92+
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* fetchColumnIDs */
9393
); err != nil {
9494
return err
9595
}

pkg/sql/distsql_physical_planner.go

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ var (
461461
cannotDistributeVectorSearchErr = newQueryNotSupportedError(
462462
"vector search operation cannot be distributed",
463463
)
464+
cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError(
465+
"system column (that requires MVCC decoding) is requested when writes have been buffered",
466+
)
464467
)
465468

466469
// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
@@ -546,6 +549,7 @@ func checkSupportForPlanNode(
546549
node planNode,
547550
distSQLVisitor *distSQLExprCheckVisitor,
548551
sd *sessiondata.SessionData,
552+
txnHasBufferedWrites bool,
549553
) (retRec distRecommendation, retErr error) {
550554
if buildutil.CrdbTestBuild {
551555
defer func() {
@@ -563,19 +567,19 @@ func checkSupportForPlanNode(
563567
return shouldDistribute, nil
564568

565569
case *distinctNode:
566-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
570+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
567571

568572
case *exportNode:
569-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
573+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
570574

571575
case *filterNode:
572576
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
573577
return cannotDistribute, err
574578
}
575-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
579+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
576580

577581
case *groupNode:
578-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
582+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
579583
if err != nil {
580584
return cannotDistribute, err
581585
}
@@ -604,10 +608,14 @@ func checkSupportForPlanNode(
604608
// TODO(nvanbenschoten): lift this restriction.
605609
return cannotDistribute, cannotDistributeRowLevelLockingErr
606610
}
607-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
611+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
612+
// TODO(#144166): relax this.
613+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
614+
}
615+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
608616

609617
case *invertedFilterNode:
610-
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)
618+
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd, txnHasBufferedWrites)
611619

612620
case *invertedJoinNode:
613621
if n.fetch.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -617,10 +625,14 @@ func checkSupportForPlanNode(
617625
// TODO(nvanbenschoten): lift this restriction.
618626
return cannotDistribute, cannotDistributeRowLevelLockingErr
619627
}
628+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
629+
// TODO(#144166): relax this.
630+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
631+
}
620632
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
621633
return cannotDistribute, err
622634
}
623-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
635+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
624636
if err != nil {
625637
return cannotDistribute, err
626638
}
@@ -633,11 +645,11 @@ func checkSupportForPlanNode(
633645
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
634646
return cannotDistribute, err
635647
}
636-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
648+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
637649
if err != nil {
638650
return cannotDistribute, err
639651
}
640-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
652+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
641653
if err != nil {
642654
return cannotDistribute, err
643655
}
@@ -658,7 +670,7 @@ func checkSupportForPlanNode(
658670
// Note that we don't need to check whether we support distribution of
659671
// n.countExpr or n.offsetExpr because those expressions are evaluated
660672
// locally, during the physical planning.
661-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
673+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
662674

663675
case *lookupJoinNode:
664676
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
@@ -672,7 +684,10 @@ func checkSupportForPlanNode(
672684
// TODO(nvanbenschoten): lift this restriction.
673685
return cannotDistribute, cannotDistributeRowLevelLockingErr
674686
}
675-
687+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
688+
// TODO(#144166): relax this.
689+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
690+
}
676691
if err := checkExprForDistSQL(n.lookupExpr, distSQLVisitor); err != nil {
677692
return cannotDistribute, err
678693
}
@@ -682,7 +697,7 @@ func checkSupportForPlanNode(
682697
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
683698
return cannotDistribute, err
684699
}
685-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
700+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
686701
if err != nil {
687702
return cannotDistribute, err
688703
}
@@ -699,15 +714,15 @@ func checkSupportForPlanNode(
699714
return cannotDistribute, err
700715
}
701716
}
702-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
717+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
703718

704719
case *renderNode:
705720
for _, e := range n.render {
706721
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
707722
return cannotDistribute, err
708723
}
709724
}
710-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
725+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
711726

712727
case *scanNode:
713728
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -721,6 +736,10 @@ func checkSupportForPlanNode(
721736
// This is a locality optimized scan.
722737
return cannotDistribute, localityOptimizedOpNotDistributableErr
723738
}
739+
if txnHasBufferedWrites && n.fetchPlanningInfo.requiresMVCCDecoding() {
740+
// TODO(#144166): relax this.
741+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
742+
}
724743
scanRec := canDistribute
725744
if n.estimatedRowCount != 0 {
726745
var suffix string
@@ -748,7 +767,7 @@ func checkSupportForPlanNode(
748767
return scanRec, nil
749768

750769
case *sortNode:
751-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
770+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
752771
if err != nil {
753772
return cannotDistribute, err
754773
}
@@ -765,7 +784,7 @@ func checkSupportForPlanNode(
765784
return rec.compose(sortRec), nil
766785

767786
case *topKNode:
768-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
787+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
769788
if err != nil {
770789
return cannotDistribute, err
771790
}
@@ -785,11 +804,11 @@ func checkSupportForPlanNode(
785804
return canDistribute, nil
786805

787806
case *unionNode:
788-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
807+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
789808
if err != nil {
790809
return cannotDistribute, err
791810
}
792-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
811+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
793812
if err != nil {
794813
return cannotDistribute, err
795814
}
@@ -819,7 +838,7 @@ func checkSupportForPlanNode(
819838
return cannotDistribute, cannotDistributeVectorSearchErr
820839

821840
case *windowNode:
822-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
841+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
823842
if err != nil {
824843
return cannotDistribute, err
825844
}
@@ -845,6 +864,10 @@ func checkSupportForPlanNode(
845864
// TODO(nvanbenschoten): lift this restriction.
846865
return cannotDistribute, cannotDistributeRowLevelLockingErr
847866
}
867+
if txnHasBufferedWrites && side.fetch.requiresMVCCDecoding() {
868+
// TODO(#144166): relax this.
869+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
870+
}
848871
}
849872
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
850873
return cannotDistribute, err
@@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode(
864887
n *invertedFilterNode,
865888
distSQLVisitor *distSQLExprCheckVisitor,
866889
sd *sessiondata.SessionData,
890+
txnHasBufferedWrites bool,
867891
) (distRecommendation, error) {
868-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
892+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
869893
if err != nil {
870894
return cannotDistribute, err
871895
}
@@ -5398,7 +5422,16 @@ func checkScanParallelizationIfLocal(
53985422
if len(n.reqOrdering) == 0 && n.parallelize {
53995423
hasScanNodeToParallelize = true
54005424
}
5401-
case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode,
5425+
if n.fetchPlanningInfo.requiresMVCCDecoding() {
5426+
prohibitParallelization = true
5427+
return
5428+
}
5429+
case *indexJoinNode:
5430+
if n.fetch.requiresMVCCDecoding() {
5431+
prohibitParallelization = true
5432+
return
5433+
}
5434+
case *distinctNode, *explainVecNode, *limitNode,
54025435
*ordinalityNode, *sortNode, *unionNode, *valuesNode:
54035436
default:
54045437
prohibitParallelization = true

0 commit comments

Comments
 (0)