Skip to content

Commit bba2441

Browse files
authored
Merge pull request #154446 from yuzefovich/blathers/backport-release-25.4-152201
release-25.4: sql: disable DistSQL when txn buffered writes and MVCC decoding is required
2 parents f6583c2 + 024761a commit bba2441

22 files changed

+423
-81
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,6 +1689,13 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
16891689
return tc.hasPerformedWritesLocked()
16901690
}
16911691

1692+
// HasBufferedWrites is part of the TxnSender interface.
1693+
func (tc *TxnCoordSender) HasBufferedWrites() bool {
1694+
tc.mu.Lock()
1695+
defer tc.mu.Unlock()
1696+
return tc.hasBufferedWritesLocked()
1697+
}
1698+
16921699
// TestingShouldRetry is part of the TxnSender interface.
16931700
func (tc *TxnCoordSender) TestingShouldRetry() bool {
16941701
tc.mu.Lock()
@@ -1713,3 +1720,7 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
17131720
func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
17141721
return tc.mu.txn.Sequence != 0
17151722
}
1723+
1724+
func (tc *TxnCoordSender) hasBufferedWritesLocked() bool {
1725+
return tc.interceptorAlloc.txnWriteBuffer.hasBufferedWrites()
1726+
}

pkg/kv/mock_transactional_sender.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
282282
panic("unimplemented")
283283
}
284284

285+
// HasBufferedWrites is part of TxnSenderFactory.
286+
func (m *MockTransactionalSender) HasBufferedWrites() bool {
287+
return false
288+
}
289+
285290
// TestingShouldRetry is part of TxnSenderFactory.
286291
func (m *MockTransactionalSender) TestingShouldRetry() bool {
287292
return false

pkg/kv/sender.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,12 @@ type TxnSender interface {
134134
SetOmitInRangefeeds()
135135

136136
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
137-
// gateway node until the commit time. Only allowed on the RootTxn. Buffered
138-
// writes cannot be enabled on a txn that performed any requests. When
139-
// disabling buffered writes, if there are any writes in the buffer, they
140-
// are flushed with the next BatchRequest.
137+
// gateway node until the commit time. Buffered writes cannot be enabled on
138+
// a txn that performed any requests. When disabling buffered writes, if
139+
// there are any writes in the buffer, they are flushed with the next
140+
// BatchRequest.
141+
//
142+
// Only allowed on the RootTxn.
141143
SetBufferedWritesEnabled(bool)
142144

143145
// BufferedWritesEnabled returns whether the buffered writes are enabled.
@@ -379,6 +381,10 @@ type TxnSender interface {
379381
// transaction's current epoch.
380382
HasPerformedWrites() bool
381383

384+
// HasBufferedWrites returns true if a write has been buffered for the
385+
// transaction's current epoch.
386+
HasBufferedWrites() bool
387+
382388
// TestingShouldRetry returns true if transaction retry errors should be
383389
// randomly returned to callers. Note that it is the responsibility of
384390
// (*kv.DB).Txn() to return the retries. This lives here since the

pkg/kv/txn.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,18 @@ func (txn *Txn) debugNameLocked() string {
443443
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
444444
}
445445

446+
// SetBufferedWritesEnabled toggles whether the writes are buffered on the
447+
// gateway node until the commit time. Buffered writes cannot be enabled on a
448+
// txn that performed any requests. When disabling buffered writes, if there are
449+
// any writes in the buffer, they are flushed with the next BatchRequest.
450+
//
451+
// Only allowed on the RootTxn.
446452
func (txn *Txn) SetBufferedWritesEnabled(enabled bool) {
447453
if txn.typ != RootTxn {
448-
panic(errors.AssertionFailedf("SetBufferedWritesEnabled() called on leaf txn"))
454+
panic(errors.AssertionFailedf(
455+
"SetBufferedWritesEnabled(%t) called on leaf txn (buffer empty? %t)",
456+
enabled, txn.HasBufferedWrites(),
457+
))
449458
}
450459

451460
txn.mu.Lock()
@@ -1862,6 +1871,14 @@ func (txn *Txn) HasPerformedWrites() bool {
18621871
return txn.mu.sender.HasPerformedWrites()
18631872
}
18641873

1874+
// HasBufferedWrites returns true if a write has been buffered for the
1875+
// transaction's current epoch.
1876+
func (txn *Txn) HasBufferedWrites() bool {
1877+
txn.mu.Lock()
1878+
defer txn.mu.Unlock()
1879+
return txn.mu.sender.HasBufferedWrites()
1880+
}
1881+
18651882
// AdmissionHeader returns the admission header for work done in the context
18661883
// of this transaction.
18671884
func (txn *Txn) AdmissionHeader() kvpb.AdmissionHeader {

pkg/sql/apply_join.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,7 @@ func runPlanInsidePlan(
315315
}
316316
}
317317

318-
distributePlan, distSQLProhibitedErr := getPlanDistribution(
319-
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
320-
plannerCopy.SessionData(), plan.main, &plannerCopy.distSQLVisitor,
321-
)
318+
distributePlan, distSQLProhibitedErr := plannerCopy.getPlanDistribution(ctx, plan.main)
322319
distributeType := DistributionType(LocalDistribution)
323320
if distributePlan.WillDistribute() {
324321
distributeType = FullDistribution

pkg/sql/colfetcher/cfetcher.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,15 @@ func (cf *cFetcher) Init(
467467
// MVCC decoding.
468468
if cf.mvccDecodeStrategy == storage.MVCCDecodingRequired {
469469
if cf.txn != nil && cf.txn.BufferedWritesEnabled() {
470+
if cf.txn.Type() == kv.LeafTxn {
471+
// We're only allowed to disable buffered writes on the RootTxn.
472+
// If we have a LeafTxn, we'll return an assertion error instead
473+
// of crashing.
474+
//
475+
// Note that we might have a LeafTxn with no buffered writes, in
476+
// which case BufferedWritesEnabled() is false.
477+
return errors.AssertionFailedf("got LeafTxn when MVCC decoding is required")
478+
}
470479
cf.txn.SetBufferedWritesEnabled(false /* enabled */)
471480
}
472481
}

pkg/sql/conn_executor_exec.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2847,10 +2847,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28472847
}
28482848
}
28492849
}
2850-
distributePlan, distSQLProhibitedErr := getPlanDistribution(
2851-
ctx, planner.Descriptors().HasUncommittedTypes(),
2852-
ex.sessionData(), planner.curPlan.main, &planner.distSQLVisitor,
2853-
)
2850+
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
28542851
if afterGetPlanDistribution != nil {
28552852
afterGetPlanDistribution()
28562853
}

pkg/sql/delete_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
9494
// fetch kvs.
9595
var spec fetchpb.IndexFetchSpec
9696
if err := rowenc.InitIndexFetchSpec(
97-
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* columnIDs */
97+
&spec, params.ExecCfg().Codec, d.desc, d.desc.GetPrimaryIndex(), nil, /* fetchColumnIDs */
9898
); err != nil {
9999
return err
100100
}

pkg/sql/distsql_physical_planner.go

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ var (
461461
cannotDistributeVectorSearchErr = newQueryNotSupportedError(
462462
"vector search operation cannot be distributed",
463463
)
464+
cannotDistributeSystemColumnsAndBufferedWrites = newQueryNotSupportedError(
465+
"system column (that requires MVCC decoding) is requested when writes have been buffered",
466+
)
464467
)
465468

466469
// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
@@ -546,6 +549,7 @@ func checkSupportForPlanNode(
546549
node planNode,
547550
distSQLVisitor *distSQLExprCheckVisitor,
548551
sd *sessiondata.SessionData,
552+
txnHasBufferedWrites bool,
549553
) (retRec distRecommendation, retErr error) {
550554
if buildutil.CrdbTestBuild {
551555
defer func() {
@@ -563,19 +567,19 @@ func checkSupportForPlanNode(
563567
return shouldDistribute, nil
564568

565569
case *distinctNode:
566-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
570+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
567571

568572
case *exportNode:
569-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
573+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
570574

571575
case *filterNode:
572576
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
573577
return cannotDistribute, err
574578
}
575-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
579+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
576580

577581
case *groupNode:
578-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
582+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
579583
if err != nil {
580584
return cannotDistribute, err
581585
}
@@ -604,10 +608,14 @@ func checkSupportForPlanNode(
604608
// TODO(nvanbenschoten): lift this restriction.
605609
return cannotDistribute, cannotDistributeRowLevelLockingErr
606610
}
607-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
611+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
612+
// TODO(#144166): relax this.
613+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
614+
}
615+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
608616

609617
case *invertedFilterNode:
610-
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)
618+
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd, txnHasBufferedWrites)
611619

612620
case *invertedJoinNode:
613621
if n.fetch.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -617,10 +625,14 @@ func checkSupportForPlanNode(
617625
// TODO(nvanbenschoten): lift this restriction.
618626
return cannotDistribute, cannotDistributeRowLevelLockingErr
619627
}
628+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
629+
// TODO(#144166): relax this.
630+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
631+
}
620632
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
621633
return cannotDistribute, err
622634
}
623-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
635+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
624636
if err != nil {
625637
return cannotDistribute, err
626638
}
@@ -633,11 +645,11 @@ func checkSupportForPlanNode(
633645
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
634646
return cannotDistribute, err
635647
}
636-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
648+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
637649
if err != nil {
638650
return cannotDistribute, err
639651
}
640-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
652+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
641653
if err != nil {
642654
return cannotDistribute, err
643655
}
@@ -658,7 +670,7 @@ func checkSupportForPlanNode(
658670
// Note that we don't need to check whether we support distribution of
659671
// n.countExpr or n.offsetExpr because those expressions are evaluated
660672
// locally, during the physical planning.
661-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
673+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
662674

663675
case *lookupJoinNode:
664676
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
@@ -672,7 +684,10 @@ func checkSupportForPlanNode(
672684
// TODO(nvanbenschoten): lift this restriction.
673685
return cannotDistribute, cannotDistributeRowLevelLockingErr
674686
}
675-
687+
if txnHasBufferedWrites && n.fetch.requiresMVCCDecoding() {
688+
// TODO(#144166): relax this.
689+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
690+
}
676691
if err := checkExprForDistSQL(n.lookupExpr, distSQLVisitor); err != nil {
677692
return cannotDistribute, err
678693
}
@@ -682,7 +697,7 @@ func checkSupportForPlanNode(
682697
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
683698
return cannotDistribute, err
684699
}
685-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
700+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
686701
if err != nil {
687702
return cannotDistribute, err
688703
}
@@ -699,15 +714,15 @@ func checkSupportForPlanNode(
699714
return cannotDistribute, err
700715
}
701716
}
702-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
717+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
703718

704719
case *renderNode:
705720
for _, e := range n.render {
706721
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
707722
return cannotDistribute, err
708723
}
709724
}
710-
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
725+
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
711726

712727
case *scanNode:
713728
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
@@ -721,6 +736,10 @@ func checkSupportForPlanNode(
721736
// This is a locality optimized scan.
722737
return cannotDistribute, localityOptimizedOpNotDistributableErr
723738
}
739+
if txnHasBufferedWrites && n.fetchPlanningInfo.requiresMVCCDecoding() {
740+
// TODO(#144166): relax this.
741+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
742+
}
724743
scanRec := canDistribute
725744
if n.estimatedRowCount != 0 {
726745
var suffix string
@@ -748,7 +767,7 @@ func checkSupportForPlanNode(
748767
return scanRec, nil
749768

750769
case *sortNode:
751-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
770+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
752771
if err != nil {
753772
return cannotDistribute, err
754773
}
@@ -765,7 +784,7 @@ func checkSupportForPlanNode(
765784
return rec.compose(sortRec), nil
766785

767786
case *topKNode:
768-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
787+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
769788
if err != nil {
770789
return cannotDistribute, err
771790
}
@@ -785,11 +804,11 @@ func checkSupportForPlanNode(
785804
return canDistribute, nil
786805

787806
case *unionNode:
788-
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
807+
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd, txnHasBufferedWrites)
789808
if err != nil {
790809
return cannotDistribute, err
791810
}
792-
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
811+
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd, txnHasBufferedWrites)
793812
if err != nil {
794813
return cannotDistribute, err
795814
}
@@ -819,7 +838,7 @@ func checkSupportForPlanNode(
819838
return cannotDistribute, cannotDistributeVectorSearchErr
820839

821840
case *windowNode:
822-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
841+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
823842
if err != nil {
824843
return cannotDistribute, err
825844
}
@@ -845,6 +864,10 @@ func checkSupportForPlanNode(
845864
// TODO(nvanbenschoten): lift this restriction.
846865
return cannotDistribute, cannotDistributeRowLevelLockingErr
847866
}
867+
if txnHasBufferedWrites && side.fetch.requiresMVCCDecoding() {
868+
// TODO(#144166): relax this.
869+
return cannotDistribute, cannotDistributeSystemColumnsAndBufferedWrites
870+
}
848871
}
849872
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
850873
return cannotDistribute, err
@@ -864,8 +887,9 @@ func checkSupportForInvertedFilterNode(
864887
n *invertedFilterNode,
865888
distSQLVisitor *distSQLExprCheckVisitor,
866889
sd *sessiondata.SessionData,
890+
txnHasBufferedWrites bool,
867891
) (distRecommendation, error) {
868-
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
892+
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd, txnHasBufferedWrites)
869893
if err != nil {
870894
return cannotDistribute, err
871895
}
@@ -5399,7 +5423,16 @@ func checkScanParallelizationIfLocal(
53995423
if len(n.reqOrdering) == 0 && n.parallelize {
54005424
hasScanNodeToParallelize = true
54015425
}
5402-
case *distinctNode, *explainVecNode, *indexJoinNode, *limitNode,
5426+
if n.fetchPlanningInfo.requiresMVCCDecoding() {
5427+
prohibitParallelization = true
5428+
return
5429+
}
5430+
case *indexJoinNode:
5431+
if n.fetch.requiresMVCCDecoding() {
5432+
prohibitParallelization = true
5433+
return
5434+
}
5435+
case *distinctNode, *explainVecNode, *limitNode,
54035436
*ordinalityNode, *sortNode, *unionNode, *valuesNode:
54045437
default:
54055438
prohibitParallelization = true

0 commit comments

Comments
 (0)