Skip to content

Commit a7a0b62

Browse files
committed
workload/kv: add --sel1-writes
Add a `--sel1-writes` option to the kv workload. Similar to `--sfu-writes`, this option executes `SELECT 1;` before the mutation statement in an explicit transaction, in order to prevent the mutation statement (or the `SELECT FOR UPDATE` statement if `--sfu-writes` is also used) from using automatic transaction-level retries. This is useful when testing the behavior of automatic statement-level retries under read committed isolation. When all three of `--sel1-writes`, `--sfu-writes`, and `--sfu-wait-delay` are used, the transaction looks like: ``` BEGIN; SELECT 1; SELECT k, v FROM kv WHERE k IN ($1) FOR UPDATE; -- sfu-wait-delay -- mutation statement COMMIT; ``` Informs: #145377 Release note: None
1 parent 7eaf7a8 commit a7a0b62

File tree

1 file changed

+27
-9
lines changed

1 file changed

+27
-9
lines changed

pkg/workload/kv/kv.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type kv struct {
9393
insertCount int
9494
txnQoS string
9595
prepareReadOnly bool
96+
writesUseSelect1 bool
9697
}
9798

9899
func init() {
@@ -129,6 +130,7 @@ var kvMeta = workload.Meta{
129130
`scatter`: {RuntimeOnly: true},
130131
`timeout`: {RuntimeOnly: true},
131132
`prepare-read-only`: {RuntimeOnly: true},
133+
`sel1-writes`: {RuntimeOnly: true},
132134
}
133135
g.flags.IntVar(&g.batchSize, `batch`, 1,
134136
`Number of blocks to read/insert in a single SQL statement.`)
@@ -178,11 +180,13 @@ var kvMeta = workload.Meta{
178180
g.flags.IntVar(&g.keySize, `key-size`, 0,
179181
`Use string key of appropriate size instead of int`)
180182
g.flags.DurationVar(&g.sfuDelay, `sfu-wait-delay`, 10*time.Millisecond,
181-
`Delay before sfu write transaction commits or aborts`)
183+
`Delay after SFU when using --sfu-writes (or after SELECT 1 when using --sel1-writes).`)
182184
g.flags.StringVar(&g.txnQoS, `txn-qos`, `regular`,
183185
`Set default_transaction_quality_of_service session variable, accepted`+
184186
`values are 'background', 'regular' and 'critical'.`)
185187
g.flags.BoolVar(&g.prepareReadOnly, `prepare-read-only`, false, `Prepare and perform only read statements.`)
188+
g.flags.BoolVar(&g.writesUseSelect1, `sel1-writes`, false,
189+
`Use SELECT 1 as the first statement of transactional writes with a sleep after SELECT 1.`)
186190
g.connFlags = workload.NewConnFlags(&g.flags)
187191
return g
188192
},
@@ -550,6 +554,7 @@ func (w *kv) Ops(
550554
if len(sfuStmtStr) > 0 && !op.config.prepareReadOnly {
551555
op.sfuStmt = op.sr.Define(sfuStmtStr)
552556
}
557+
op.sel1Stmt = op.sr.Define("SELECT 1")
553558
op.spanStmt = op.sr.Define(spanStmtStr)
554559
if w.txnQoS != `regular` {
555560
stmt := op.sr.Define(fmt.Sprintf(
@@ -582,6 +587,7 @@ type kvOp struct {
582587
writeStmt workload.StmtHandle
583588
spanStmt workload.StmtHandle
584589
sfuStmt workload.StmtHandle
590+
sel1Stmt workload.StmtHandle
585591
delStmt workload.StmtHandle
586592
g keyGenerator
587593
t keyTransformer
@@ -680,7 +686,7 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
680686
}
681687
start := timeutil.Now()
682688
var err error
683-
if o.config.writesUseSelectForUpdate {
689+
if o.config.writesUseSelect1 || o.config.writesUseSelectForUpdate {
684690
// We could use crdb.ExecuteTx, but we avoid retries in this workload so
685691
// that each run call makes 1 attempt, so that rate limiting in workerRun
686692
// behaves as expected.
@@ -696,15 +702,27 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
696702
retErr = errors.CombineErrors(retErr, rollbackErr)
697703
}
698704
}()
699-
rows, err := o.sfuStmt.QueryTx(ctx, tx, sfuArgs...)
700-
if err != nil {
701-
return err
705+
if o.config.writesUseSelect1 {
706+
rows, err := o.sel1Stmt.QueryTx(ctx, tx)
707+
if err != nil {
708+
return err
709+
}
710+
rows.Close()
711+
if err = rows.Err(); err != nil {
712+
return err
713+
}
702714
}
703-
rows.Close()
704-
if err = rows.Err(); err != nil {
705-
return err
715+
if o.config.writesUseSelectForUpdate {
716+
rows, err := o.sfuStmt.QueryTx(ctx, tx, sfuArgs...)
717+
if err != nil {
718+
return err
719+
}
720+
rows.Close()
721+
if err = rows.Err(); err != nil {
722+
return err
723+
}
706724
}
707-
// Simulate a transaction that does other work between the SFU and write.
725+
// Simulate a transaction that does other work between the sel1 / SFU and write.
708726
time.Sleep(o.config.sfuDelay)
709727
if _, err = o.writeStmt.ExecTx(ctx, tx, writeArgs...); err != nil {
710728
// Multiple write transactions can contend and encounter

0 commit comments

Comments
 (0)