@@ -83,6 +83,8 @@ type kv struct {
83
83
sequential bool
84
84
zipfian bool
85
85
sfuDelay time.Duration
86
+ longRunningTxn bool
87
+ longRunningTxnNumWrites int
86
88
splits int
87
89
scatter bool
88
90
secondaryIndex bool
@@ -119,18 +121,20 @@ var kvMeta = workload.Meta{
119
121
g := & kv {}
120
122
g .flags .FlagSet = pflag .NewFlagSet (`kv` , pflag .ContinueOnError )
121
123
g .flags .Meta = map [string ]workload.FlagMeta {
122
- `batch` : {RuntimeOnly : true },
123
- `sfu-wait-delay` : {RuntimeOnly : true },
124
- `sfu-writes` : {RuntimeOnly : true },
125
- `read-percent` : {RuntimeOnly : true },
126
- `span-percent` : {RuntimeOnly : true },
127
- `span-limit` : {RuntimeOnly : true },
128
- `del-percent` : {RuntimeOnly : true },
129
- `splits` : {RuntimeOnly : true },
130
- `scatter` : {RuntimeOnly : true },
131
- `timeout` : {RuntimeOnly : true },
132
- `prepare-read-only` : {RuntimeOnly : true },
133
- `sel1-writes` : {RuntimeOnly : true },
124
+ `batch` : {RuntimeOnly : true },
125
+ `sfu-wait-delay` : {RuntimeOnly : true },
126
+ `sfu-writes` : {RuntimeOnly : true },
127
+ `long-running-txn` : {RuntimeOnly : true },
128
+ `long-running-txn-num-writes` : {RuntimeOnly : true },
129
+ `read-percent` : {RuntimeOnly : true },
130
+ `span-percent` : {RuntimeOnly : true },
131
+ `span-limit` : {RuntimeOnly : true },
132
+ `del-percent` : {RuntimeOnly : true },
133
+ `splits` : {RuntimeOnly : true },
134
+ `scatter` : {RuntimeOnly : true },
135
+ `timeout` : {RuntimeOnly : true },
136
+ `prepare-read-only` : {RuntimeOnly : true },
137
+ `sel1-writes` : {RuntimeOnly : true },
134
138
}
135
139
g .flags .IntVar (& g .batchSize , `batch` , 1 ,
136
140
`Number of blocks to read/insert in a single SQL statement.` )
@@ -187,6 +191,13 @@ var kvMeta = workload.Meta{
187
191
g .flags .BoolVar (& g .prepareReadOnly , `prepare-read-only` , false , `Prepare and perform only read statements.` )
188
192
g .flags .BoolVar (& g .writesUseSelect1 , `sel1-writes` , false ,
189
193
`Use SELECT 1 as the first statement of transactional writes with a sleep after SELECT 1.` )
194
+ g .flags .BoolVar (& g .longRunningTxn , `long-running-txn` , false ,
195
+ `Use a long-running transaction for running lock contention scenarios. If run with ` +
196
+ `--sfu-writes or --sel1-writes, it will use those writes in the long-running transaction; ` +
197
+ `otherwise, it will use regular writes. Each long-running write transaction counts for a` +
198
+ `single write, as measured by --read-percent.` )
199
+ g .flags .IntVar (& g .longRunningTxnNumWrites , `long-running-txn-num-writes` , 10 ,
200
+ `Number of writes in the long-running transaction when using --long-running-txn.` )
190
201
g .connFlags = workload .NewConnFlags (& g .flags )
191
202
return g
192
203
},
@@ -670,23 +681,26 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
670
681
o .hists .Get (`span` ).Record (elapsed )
671
682
return err
672
683
}
673
- const argCount = 2
674
- writeArgs := make ([]interface {}, argCount * o .config .batchSize )
675
- var sfuArgs []interface {}
676
- if o .config .writesUseSelectForUpdate {
677
- sfuArgs = make ([]interface {}, o .config .batchSize )
678
- }
679
- for i := 0 ; i < o .config .batchSize ; i ++ {
680
- j := i * argCount
681
- writeArgs [j + 0 ] = o .t .getKey (o .g .writeKey ())
682
- if sfuArgs != nil {
683
- sfuArgs [i ] = writeArgs [j ]
684
+ makeWriteBatchArgs := func () ([]interface {}, []interface {}) {
685
+ const argCount = 2
686
+ writeArgs := make ([]interface {}, argCount * o .config .batchSize )
687
+ var sfuArgs []interface {}
688
+ if o .config .writesUseSelectForUpdate {
689
+ sfuArgs = make ([]interface {}, o .config .batchSize )
690
+ }
691
+ for i := 0 ; i < o .config .batchSize ; i ++ {
692
+ j := i * argCount
693
+ writeArgs [j + 0 ] = o .t .getKey (o .g .writeKey ())
694
+ if sfuArgs != nil {
695
+ sfuArgs [i ] = writeArgs [j ]
696
+ }
697
+ writeArgs [j + 1 ] = o .config .randBlock (o .g .rand ())
684
698
}
685
- writeArgs [ j + 1 ] = o . config . randBlock ( o . g . rand ())
699
+ return writeArgs , sfuArgs
686
700
}
687
701
start := timeutil .Now ()
688
702
var err error
689
- if o .config .writesUseSelect1 || o .config .writesUseSelectForUpdate {
703
+ if o .config .writesUseSelect1 || o .config .writesUseSelectForUpdate || o . config . longRunningTxn {
690
704
// We could use crdb.ExecuteTx, but we avoid retries in this workload so
691
705
// that each run call makes 1 attempt, so that rate limiting in workerRun
692
706
// behaves as expected.
@@ -695,44 +709,52 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
695
709
if err != nil {
696
710
return err
697
711
}
698
-
699
712
defer func () {
700
713
rollbackErr := tx .Rollback (ctx )
701
714
if ! errors .Is (rollbackErr , pgx .ErrTxClosed ) {
702
715
retErr = errors .CombineErrors (retErr , rollbackErr )
703
716
}
704
717
}()
705
- if o .config .writesUseSelect1 {
706
- rows , err := o .sel1Stmt .QueryTx (ctx , tx )
707
- if err != nil {
708
- return err
709
- }
710
- rows .Close ()
711
- if err = rows .Err (); err != nil {
712
- return err
713
- }
718
+ iterations := 1
719
+ if o .config .longRunningTxn {
720
+ iterations = o .config .longRunningTxnNumWrites
714
721
}
715
- if o .config .writesUseSelectForUpdate {
716
- rows , err := o .sfuStmt .QueryTx (ctx , tx , sfuArgs ... )
717
- if err != nil {
718
- return err
722
+ for i := 0 ; i < iterations ; i ++ {
723
+ writeArgs , sfuArgs := makeWriteBatchArgs ()
724
+ if o .config .writesUseSelect1 {
725
+ rows , err := o .sel1Stmt .QueryTx (ctx , tx )
726
+ if err != nil {
727
+ return err
728
+ }
729
+ rows .Close ()
730
+ if err = rows .Err (); err != nil {
731
+ return err
732
+ }
733
+ }
734
+ if o .config .writesUseSelectForUpdate {
735
+ rows , err := o .sfuStmt .QueryTx (ctx , tx , sfuArgs ... )
736
+ if err != nil {
737
+ return err
738
+ }
739
+ rows .Close ()
740
+ if err = rows .Err (); err != nil {
741
+ // The transaction may have experienced an error in the meantime.
742
+ return o .tryHandleWriteErr ("write-write-err" , start , err )
743
+ }
719
744
}
720
- rows .Close ()
721
- if err = rows .Err (); err != nil {
722
- return err
745
+ // Simulate a transaction that does other work between the sel1 / SFU and write.
746
+ time .Sleep (o .config .sfuDelay )
747
+ if _ , err = o .writeStmt .ExecTx (ctx , tx , writeArgs ... ); err != nil {
748
+ // Multiple write transactions can contend and encounter
749
+ // a serialization failure. We swallow such an error.
750
+ return o .tryHandleWriteErr ("write-write-err" , start , err )
723
751
}
724
752
}
725
- // Simulate a transaction that does other work between the sel1 / SFU and write.
726
- time .Sleep (o .config .sfuDelay )
727
- if _ , err = o .writeStmt .ExecTx (ctx , tx , writeArgs ... ); err != nil {
728
- // Multiple write transactions can contend and encounter
729
- // a serialization failure. We swallow such an error.
730
- return o .tryHandleWriteErr ("write-write-err" , start , err )
731
- }
732
753
if err = tx .Commit (ctx ); err != nil {
733
754
return o .tryHandleWriteErr ("write-commit-err" , start , err )
734
755
}
735
756
} else {
757
+ writeArgs , _ := makeWriteBatchArgs ()
736
758
_ , err = o .writeStmt .Exec (ctx , writeArgs ... )
737
759
}
738
760
if err != nil {
0 commit comments