Skip to content

Commit 262c0e0

Browse files
craig[bot]michae2
andcommitted
Merge #146860
146860: sql: add exponential backoff to read committed stmt retry loop r=yuzefovich,rafiss a=michae2 Testing has shown that adding exponential backoff significantly improves throughput of highly contentious read committed workloads. Informs: #145377 Release note (sql change): Add session variable `initial_retry_backoff_for_read_committed` which controls the initial backoff duration when retrying an individual statement in an explicit READ COMMITTED transaction. A duration of 0 disables exponential backoff. If a statement in an explicit transaction is failing with the following 40001 error: ``` ERROR: restart transaction: read committed retry limit exceeded; set by max_retries_for_read_committed=... ``` Then `initial_retry_backoff_for_read_committed` should be set to a duration proportional to the typical execution time of the statement (in addition to also increasing `max_retries_for_read_committed`). Co-authored-by: Michael Erickson <[email protected]>
2 parents ce23a35 + e48e814 commit 262c0e0

File tree

9 files changed

+121
-27
lines changed

9 files changed

+121
-27
lines changed

pkg/sql/conn_executor_exec.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import (
6969
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
7070
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
7171
"github.com/cockroachdb/cockroach/pkg/util/metric"
72+
"github.com/cockroachdb/cockroach/pkg/util/retry"
7273
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
7374
"github.com/cockroachdb/cockroach/pkg/util/tracing"
7475
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
@@ -2560,18 +2561,18 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) (retEr
25602561

25612562
ex.extraTxnState.prepStmtsNamespace.closePortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
25622563

2563-
// We need to step the transaction's internal read sequence before committing
2564-
// if it has stepping enabled. If it doesn't have stepping enabled, then we
2565-
// just set the stepping mode back to what it was.
2564+
// We need to step the transaction's read sequence before committing if it has
2565+
// stepping enabled. If it doesn't have stepping enabled, then we just set the
2566+
// stepping mode back to what it was.
25662567
//
2567-
// Even if we do step the transaction's internal read sequence, we do not
2568-
// advance its external read timestamp (applicable only to read committed
2569-
// transactions). This is because doing so is not needed before committing,
2570-
// and it would cause the transaction to commit at a higher timestamp than
2571-
// necessary. On heavily contended workloads like the one from #109628, this
2572-
// can cause unnecessary write-write contention between transactions by
2573-
// inflating the contention footprint of each transaction (i.e. the duration
2574-
// measured in MVCC time that the transaction holds locks).
2568+
// Even if we do step the transaction's read sequence, we do not advance its
2569+
// read timestamp (applicable only to read committed transactions). This is
2570+
// because doing so is not needed before committing, and it would cause the
2571+
// transaction to commit at a higher timestamp than necessary. On heavily
2572+
// contended workloads like the one from #109628, this can cause unnecessary
2573+
// write-write contention between transactions by inflating the contention
2574+
// footprint of each transaction (i.e. the duration measured in MVCC time that
2575+
// the transaction holds locks).
25752576
prevSteppingMode := ex.state.mu.txn.ConfigureStepping(ctx, kv.SteppingEnabled)
25762577
if prevSteppingMode == kv.SteppingEnabled {
25772578
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
@@ -2722,6 +2723,14 @@ func (ex *connExecutor) rollbackSQLTransaction(
27222723
func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
27232724
ctx context.Context, p *planner, res RestrictedCommandResult,
27242725
) error {
2726+
if ex.executorType == executorTypeInternal {
2727+
// Because we step the read timestamp below, this is not safe to call within
2728+
// internal executor.
2729+
return errors.AssertionFailedf(
2730+
"call of dispatchReadCommittedStmtToExecutionEngine within internal executor",
2731+
)
2732+
}
2733+
27252734
getPausablePortalInfo := func() *portalPauseInfo {
27262735
if p != nil && p.pausablePortal != nil {
27272736
return p.pausablePortal.pauseInfo
@@ -2738,8 +2747,19 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
27382747
return err
27392748
}
27402749

2750+
// Use retry with exponential backoff and full jitter to reduce collisions for
2751+
// high-contention workloads. See https://en.wikipedia.org/wiki/Exponential_backoff and
2752+
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
27412753
maxRetries := int(ex.sessionData().MaxRetriesForReadCommitted)
2742-
for attemptNum := 0; ; attemptNum++ {
2754+
initialBackoff := ex.sessionData().InitialRetryBackoffForReadCommitted
2755+
useBackoff := initialBackoff > 0
2756+
opts := retry.Options{
2757+
InitialBackoff: initialBackoff,
2758+
MaxBackoff: 1024 * initialBackoff,
2759+
Multiplier: 2.0,
2760+
RandomizationFactor: 1.0,
2761+
}
2762+
for attemptNum, r := 0, retry.StartWithCtx(ctx, opts); !useBackoff || r.Next(); attemptNum++ {
27432763
// TODO(99410): Fix the phase time for pausable portals.
27442764
startExecTS := crtime.NowMono()
27452765
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerMostRecentStartExecStmt, startExecTS)
@@ -2749,6 +2769,15 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
27492769
ex.sessionTracing.TraceRetryInformation(
27502770
ctx, "statement", p.autoRetryStmtCounter, p.autoRetryStmtReason,
27512771
)
2772+
// Step both the sequence number and the read timestamp so that we can see
2773+
// the results of the conflicting transactions that caused us to fail and
2774+
// any other transactions that occurred in the meantime.
2775+
if err := ex.state.mu.txn.Step(ctx, true /* allowReadTimestampStep */); err != nil {
2776+
return err
2777+
}
2778+
// Also step statement_timestamp so that any SQL using it is up-to-date.
2779+
stmtTS := ex.server.cfg.Clock.PhysicalTime()
2780+
p.extendedEvalCtx.StmtTimestamp = stmtTS
27522781
}
27532782
bufferPos := res.BufferedResultsLen()
27542783
if err = ex.dispatchToExecutionEngine(ctx, p, res); err != nil {
@@ -2802,9 +2831,6 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
28022831
if err := ex.state.mu.txn.PrepareForPartialRetry(ctx); err != nil {
28032832
return err
28042833
}
2805-
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
2806-
return err
2807-
}
28082834
p.autoRetryStmtCounter++
28092835
p.autoRetryStmtReason = maybeRetriableErr
28102836
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
@@ -2813,6 +2839,14 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
28132839
}
28142840
ex.metrics.EngineMetrics.StatementRetryCount.Inc(1)
28152841
}
2842+
// Check if we exited the loop due to cancelation.
2843+
if useBackoff {
2844+
select {
2845+
case <-ctx.Done():
2846+
res.SetError(cancelchecker.QueryCanceledError)
2847+
default:
2848+
}
2849+
}
28162850
return nil
28172851
}
28182852

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4186,6 +4186,10 @@ func (m *sessionDataMutator) SetOptimizerUseExistsFilterHoistRule(val bool) {
41864186
m.data.OptimizerUseExistsFilterHoistRule = val
41874187
}
41884188

4189+
func (m *sessionDataMutator) SetInitialRetryBackoffForReadCommitted(val time.Duration) {
4190+
m.data.InitialRetryBackoffForReadCommitted = val
4191+
}
4192+
41894193
// Utility functions related to scrubbing sensitive information on SQL Stats.
41904194

41914195
// quantizeCounts ensures that the Count field in the

pkg/sql/explain_bundle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,7 @@ func (c *stmtEnvCollector) PrintSessionSettings(w io.Writer, sv *settings.Values
10961096
switch varName {
10971097
case "idle_in_session_timeout", "idle_in_transaction_session_timeout",
10981098
"idle_session_timeout", "lock_timeout", "deadlock_timeout",
1099-
"statement_timeout", "transaction_timeout":
1099+
"statement_timeout", "transaction_timeout", "initial_retry_backoff_for_read_committed":
11001100
// Defaults for timeout settings are of the duration type (i.e.
11011101
// "0s"), so we'll parse it to extract the number of
11021102
// milliseconds (which is what the session variable uses).

pkg/sql/logictest/testdata/logic_test/information_schema

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4011,6 +4011,7 @@ idle_in_transaction_session_timeout 0
40114011
idle_session_timeout 0
40124012
index_join_streamer_batch_size 8.0 MiB
40134013
index_recommendations_enabled off
4014+
initial_retry_backoff_for_read_committed 0
40144015
inject_retry_errors_enabled off
40154016
inject_retry_errors_on_commit_enabled off
40164017
integer_datetimes on

pkg/sql/logictest/testdata/logic_test/pg_catalog

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3005,6 +3005,7 @@ idle_in_transaction_session_timeout 0 N
30053005
idle_session_timeout 0 NULL NULL NULL string
30063006
index_join_streamer_batch_size 8.0 MiB NULL NULL NULL string
30073007
index_recommendations_enabled off NULL NULL NULL string
3008+
initial_retry_backoff_for_read_committed 0 NULL NULL NULL string
30083009
inject_retry_errors_enabled off NULL NULL NULL string
30093010
inject_retry_errors_on_commit_enabled off NULL NULL NULL string
30103011
integer_datetimes on NULL NULL NULL string
@@ -3235,6 +3236,7 @@ idle_in_transaction_session_timeout 0 N
32353236
idle_session_timeout 0 NULL user NULL 0s 0s
32363237
index_join_streamer_batch_size 8.0 MiB NULL user NULL 8.0 MiB 8.0 MiB
32373238
index_recommendations_enabled off NULL user NULL on false
3239+
initial_retry_backoff_for_read_committed 0 NULL user NULL 0s 0s
32383240
inject_retry_errors_enabled off NULL user NULL off off
32393241
inject_retry_errors_on_commit_enabled off NULL user NULL off off
32403242
integer_datetimes on NULL user NULL on on
@@ -3453,6 +3455,7 @@ idle_in_transaction_session_timeout NULL NULL NULL
34533455
idle_session_timeout NULL NULL NULL NULL NULL
34543456
index_join_streamer_batch_size NULL NULL NULL NULL NULL
34553457
index_recommendations_enabled NULL NULL NULL NULL NULL
3458+
initial_retry_backoff_for_read_committed NULL NULL NULL NULL NULL
34563459
inject_retry_errors_enabled NULL NULL NULL NULL NULL
34573460
inject_retry_errors_on_commit_enabled NULL NULL NULL NULL NULL
34583461
integer_datetimes NULL NULL NULL NULL NULL

pkg/sql/logictest/testdata/logic_test/show_source

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ idle_in_transaction_session_timeout 0
119119
idle_session_timeout 0
120120
index_join_streamer_batch_size 8.0 MiB
121121
index_recommendations_enabled off
122+
initial_retry_backoff_for_read_committed 0
122123
inject_retry_errors_enabled off
123124
inject_retry_errors_on_commit_enabled off
124125
integer_datetimes on

pkg/sql/sessiondatapb/local_only_session_data.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,10 @@ message LocalOnlySessionData {
670670
// OptimizerUseExistsFilterHoistRule, when true, causes the optimizer to apply
671671
// the HoistUnboundFilterFromExistsSubquery rule to EXISTS conditions.
672672
bool optimizer_use_exists_filter_hoist_rule = 170;
673+
// InitialRetryBackoffForReadCommitted controls the initial backoff
674+
// duration for automatic retries of statements in explicit READ COMMITTED
675+
// transactions that see a transaction retry error. 0 disables backoff.
676+
int64 initial_retry_backoff_for_read_committed = 171 [(gogoproto.casttype) = "time.Duration"];
673677
///////////////////////////////////////////////////////////////////////////
674678
// WARNING: consider whether a session parameter you're adding needs to //
675679
// be propagated to the remote nodes. If so, that parameter should live //

pkg/sql/vars.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,8 +2248,10 @@ var varGen = map[string]sessionVar{
22482248
},
22492249

22502250
// CockroachDB extension. Configures the maximum number of automatic retries
2251-
// to perform for statements in explicit READ COMMITTED transactions that
2252-
// see a transaction retry error.
2251+
// to perform for statements in explicit READ COMMITTED transactions that see
2252+
// a transaction retry error. (See also
2253+
// initial_retry_backoff_for_read_committed which should be tuned with
2254+
// max_retries_for_read_committed.)
22532255
`max_retries_for_read_committed`: {
22542256
GetStringVal: makeIntGetStringValFn(`max_retries_for_read_committed`),
22552257
Set: func(_ context.Context, m sessionDataMutator, s string) error {
@@ -4034,6 +4036,33 @@ var varGen = map[string]sessionVar{
40344036
},
40354037
GlobalDefault: globalTrue,
40364038
},
4039+
4040+
// CockroachDB extension. Configures the initial backoff duration for
4041+
// automatic retries of statements in explicit READ COMMITTED transactions
4042+
// that see a transaction retry error. For statements experiencing contention
4043+
// under READ COMMITTED isolation, this should be set to a duration
4044+
// proportional to the typical execution time of the statement (in addition to
4045+
// also increasing `max_retries_for_read_committed`).
4046+
`initial_retry_backoff_for_read_committed`: {
4047+
GetStringVal: makeTimeoutVarGetter(`initial_retry_backoff_for_read_committed`),
4048+
Set: func(_ context.Context, m sessionDataMutator, s string) error {
4049+
duration, err := validateTimeoutVar(m.data.GetIntervalStyle(), s,
4050+
"initial_retry_backoff_for_read_committed",
4051+
)
4052+
if err != nil {
4053+
return err
4054+
}
4055+
m.SetInitialRetryBackoffForReadCommitted(duration)
4056+
return nil
4057+
},
4058+
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
4059+
ms := evalCtx.SessionData().InitialRetryBackoffForReadCommitted.Nanoseconds() / int64(time.Millisecond)
4060+
return strconv.FormatInt(ms, 10), nil
4061+
},
4062+
GlobalDefault: func(sv *settings.Values) string {
4063+
return "0s"
4064+
},
4065+
},
40374066
}
40384067

40394068
func ReplicationModeFromString(s string) (sessiondatapb.ReplicationMode, error) {

pkg/workload/kv/kv.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type kv struct {
9393
insertCount int
9494
txnQoS string
9595
prepareReadOnly bool
96+
writesUseSelect1 bool
9697
}
9798

9899
func init() {
@@ -129,6 +130,7 @@ var kvMeta = workload.Meta{
129130
`scatter`: {RuntimeOnly: true},
130131
`timeout`: {RuntimeOnly: true},
131132
`prepare-read-only`: {RuntimeOnly: true},
133+
`sel1-writes`: {RuntimeOnly: true},
132134
}
133135
g.flags.IntVar(&g.batchSize, `batch`, 1,
134136
`Number of blocks to read/insert in a single SQL statement.`)
@@ -178,11 +180,13 @@ var kvMeta = workload.Meta{
178180
g.flags.IntVar(&g.keySize, `key-size`, 0,
179181
`Use string key of appropriate size instead of int`)
180182
g.flags.DurationVar(&g.sfuDelay, `sfu-wait-delay`, 10*time.Millisecond,
181-
`Delay before sfu write transaction commits or aborts`)
183+
`Delay after SFU when using --sfu-writes (or after SELECT 1 when using --sel1-writes).`)
182184
g.flags.StringVar(&g.txnQoS, `txn-qos`, `regular`,
183185
`Set default_transaction_quality_of_service session variable, accepted`+
184186
`values are 'background', 'regular' and 'critical'.`)
185187
g.flags.BoolVar(&g.prepareReadOnly, `prepare-read-only`, false, `Prepare and perform only read statements.`)
188+
g.flags.BoolVar(&g.writesUseSelect1, `sel1-writes`, false,
189+
`Use SELECT 1 as the first statement of transactional writes with a sleep after SELECT 1.`)
186190
g.connFlags = workload.NewConnFlags(&g.flags)
187191
return g
188192
},
@@ -550,6 +554,7 @@ func (w *kv) Ops(
550554
if len(sfuStmtStr) > 0 && !op.config.prepareReadOnly {
551555
op.sfuStmt = op.sr.Define(sfuStmtStr)
552556
}
557+
op.sel1Stmt = op.sr.Define("SELECT 1")
553558
op.spanStmt = op.sr.Define(spanStmtStr)
554559
if w.txnQoS != `regular` {
555560
stmt := op.sr.Define(fmt.Sprintf(
@@ -582,6 +587,7 @@ type kvOp struct {
582587
writeStmt workload.StmtHandle
583588
spanStmt workload.StmtHandle
584589
sfuStmt workload.StmtHandle
590+
sel1Stmt workload.StmtHandle
585591
delStmt workload.StmtHandle
586592
g keyGenerator
587593
t keyTransformer
@@ -680,7 +686,7 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
680686
}
681687
start := timeutil.Now()
682688
var err error
683-
if o.config.writesUseSelectForUpdate {
689+
if o.config.writesUseSelect1 || o.config.writesUseSelectForUpdate {
684690
// We could use crdb.ExecuteTx, but we avoid retries in this workload so
685691
// that each run call makes 1 attempt, so that rate limiting in workerRun
686692
// behaves as expected.
@@ -696,15 +702,27 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
696702
retErr = errors.CombineErrors(retErr, rollbackErr)
697703
}
698704
}()
699-
rows, err := o.sfuStmt.QueryTx(ctx, tx, sfuArgs...)
700-
if err != nil {
701-
return err
705+
if o.config.writesUseSelect1 {
706+
rows, err := o.sel1Stmt.QueryTx(ctx, tx)
707+
if err != nil {
708+
return err
709+
}
710+
rows.Close()
711+
if err = rows.Err(); err != nil {
712+
return err
713+
}
702714
}
703-
rows.Close()
704-
if err = rows.Err(); err != nil {
705-
return err
715+
if o.config.writesUseSelectForUpdate {
716+
rows, err := o.sfuStmt.QueryTx(ctx, tx, sfuArgs...)
717+
if err != nil {
718+
return err
719+
}
720+
rows.Close()
721+
if err = rows.Err(); err != nil {
722+
return err
723+
}
706724
}
707-
// Simulate a transaction that does other work between the SFU and write.
725+
// Simulate a transaction that does other work between the sel1 / SFU and write.
708726
time.Sleep(o.config.sfuDelay)
709727
if _, err = o.writeStmt.ExecTx(ctx, tx, writeArgs...); err != nil {
710728
// Multiple write transactions can contend and encounter

0 commit comments

Comments
 (0)