Skip to content

Commit 067ae46

Browse files
committed
crosscluster: disable write buffering
The txnWriteBuffer does not yet support the OriginTimestamp and OriginID write options used by LDR. Soon, it will reject requests with these options set. This opts LDR out of this setting even in a cluster were the cluster setting has changed the default. Epic: none Release note: None
1 parent 6146578 commit 067ae46

File tree

4 files changed

+13
-4
lines changed

4 files changed

+13
-4
lines changed

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ func (p *kvRowProcessor) processOneRow(
224224
refreshCount int,
225225
) error {
226226
if err := p.cfg.DB.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
227+
txn.SetBufferedWritesEnabled(false)
227228
b := makeKVBatch(useLowPriority.Get(&p.cfg.Settings.SV), txn)
228229

229230
if err := p.addToBatch(ctx, txn, b, dstTableID, row, k, prevValue); err != nil {

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,9 @@ func (srp *sqlRowProcessor) GetLastRow() cdcevent.Row {
388388
}
389389

390390
var (
391-
forceGenericPlan = sessiondatapb.PlanCacheModeForceGeneric
392-
ieOverrideBase = sessiondata.InternalExecutorOverride{
391+
bufferedWritesEnabled = false
392+
forceGenericPlan = sessiondatapb.PlanCacheModeForceGeneric
393+
ieOverrideBase = sessiondata.InternalExecutorOverride{
393394
// The OriginIDForLogicalDataReplication session variable will bind the
394395
// origin ID 1 to each per-statement batch request header sent by the
395396
// internal executor. This metadata will be plumbed to the MVCCValueHeader
@@ -412,8 +413,9 @@ var (
412413
GrowStackSize: true,
413414
// We don't get any benefits from generating plan gists for internal
414415
// queries, so we disable them.
415-
DisablePlanGists: true,
416-
QualityOfService: &sessiondatapb.BulkLowQoS,
416+
DisablePlanGists: true,
417+
QualityOfService: &sessiondatapb.BulkLowQoS,
418+
BufferedWritesEnabled: &bufferedWritesEnabled,
417419
}
418420
)
419421

pkg/sql/internal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess
962962
if o.DisablePlanGists {
963963
sd.DisablePlanGists = true
964964
}
965+
if o.BufferedWritesEnabled != nil {
966+
sd.BufferedWritesEnabled = *o.BufferedWritesEnabled
967+
}
965968

966969
if o.MultiOverride != "" {
967970
overrides := strings.Split(o.MultiOverride, ",")

pkg/sql/sessiondata/internal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ type InternalExecutorOverride struct {
8484
GrowStackSize bool
8585
// DisablePlanGists, if true, overrides the disable_plan_gists session var.
8686
DisablePlanGists bool
87+
// BufferedWritesEnabled, if set, controls whether the buffered writes KV transaction
88+
// protocol is used for user queries on the current session.
89+
BufferedWritesEnabled *bool
8790
}
8891

8992
// NoSessionDataOverride is the empty InternalExecutorOverride which does not

0 commit comments

Comments
 (0)