@@ -27,8 +27,12 @@ import (
27
27
)
28
28
29
29
func makeMockTxnWriteBuffer (
30
- st * cluster.Settings , optionalMetrics ... TxnMetrics ,
31
- ) (txnWriteBuffer , * mockLockedSender ) {
30
+ ctx context.Context , optionalMetrics ... TxnMetrics ,
31
+ ) (txnWriteBuffer , * mockLockedSender , * cluster.Settings ) {
32
+ st := cluster .MakeClusterSettings ()
33
+ bufferedWritesScanTransformEnabled .Override (ctx , & st .SV , true )
34
+ bufferedWritesMaxBufferSize .Override (ctx , & st .SV , defaultBufferSize )
35
+
32
36
var metrics TxnMetrics
33
37
if len (optionalMetrics ) > 0 {
34
38
metrics = optionalMetrics [0 ]
@@ -41,7 +45,7 @@ func makeMockTxnWriteBuffer(
41
45
wrapped : mockSender ,
42
46
txnMetrics : & metrics ,
43
47
st : st ,
44
- }, mockSender
48
+ }, mockSender , st
45
49
}
46
50
47
51
func getArgs (key roachpb.Key ) * kvpb.GetRequest {
@@ -99,7 +103,7 @@ func TestTxnWriteBufferBuffersBlindWrites(t *testing.T) {
99
103
defer leaktest .AfterTest (t )()
100
104
defer log .Scope (t ).Close (t )
101
105
ctx := context .Background ()
102
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
106
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
103
107
104
108
txn := makeTxnProto ()
105
109
txn .Sequence = 1
@@ -172,7 +176,7 @@ func TestTxnWriteBufferWritesToSameKey(t *testing.T) {
172
176
defer leaktest .AfterTest (t )()
173
177
defer log .Scope (t ).Close (t )
174
178
ctx := context .Background ()
175
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
179
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
176
180
177
181
txn := makeTxnProto ()
178
182
txn .Sequence = 1
@@ -270,7 +274,7 @@ func TestTxnWriteBufferBlindWritesIncludingOtherRequests(t *testing.T) {
270
274
defer leaktest .AfterTest (t )()
271
275
defer log .Scope (t ).Close (t )
272
276
ctx := context .Background ()
273
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
277
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
274
278
275
279
txn := makeTxnProto ()
276
280
txn .Sequence = 1
@@ -365,7 +369,7 @@ func TestTxnWriteBufferCorrectlyAdjustsFlushErrors(t *testing.T) {
365
369
} {
366
370
t .Run (fmt .Sprintf ("errIdx=%d" , errIdx ), func (t * testing.T ) {
367
371
ctx := context .Background ()
368
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
372
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
369
373
370
374
txn := makeTxnProto ()
371
375
txn .Sequence = 1
@@ -446,7 +450,7 @@ func TestTxnWriteBufferCorrectlyAdjustsErrorsAfterBuffering(t *testing.T) {
446
450
} {
447
451
t .Run (fmt .Sprintf ("errIdx=%d" , errIdx ), func (t * testing.T ) {
448
452
ctx := context .Background ()
449
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
453
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
450
454
451
455
txn := makeTxnProto ()
452
456
txn .Sequence = 1
@@ -536,7 +540,7 @@ func TestTxnWriteBufferServesPointReadsLocally(t *testing.T) {
536
540
defer leaktest .AfterTest (t )()
537
541
defer log .Scope (t ).Close (t )
538
542
ctx := context .Background ()
539
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
543
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
540
544
541
545
putAtSeq := func (key roachpb.Key , val string , seq enginepb.TxnSeq ) {
542
546
txn := makeTxnProto ()
@@ -730,7 +734,7 @@ func TestTxnWriteBufferServesPointReadsAfterScan(t *testing.T) {
730
734
defer leaktest .AfterTest (t )()
731
735
defer log .Scope (t ).Close (t )
732
736
ctx := context .Background ()
733
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
737
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
734
738
735
739
txn := makeTxnProto ()
736
740
txn .Sequence = 10
@@ -790,7 +794,7 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) {
790
794
defer leaktest .AfterTest (t )()
791
795
defer log .Scope (t ).Close (t )
792
796
ctx := context .Background ()
793
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
797
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
794
798
795
799
putAtSeq := func (key roachpb.Key , val string , seq enginepb.TxnSeq ) {
796
800
txn := makeTxnProto ()
@@ -1015,7 +1019,7 @@ func TestTxnWriteBufferLockingGetRequests(t *testing.T) {
1015
1019
defer leaktest .AfterTest (t )()
1016
1020
defer log .Scope (t ).Close (t )
1017
1021
ctx := context .Background ()
1018
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1022
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1019
1023
1020
1024
txn := makeTxnProto ()
1021
1025
txn .Sequence = 10
@@ -1123,7 +1127,7 @@ func TestTxnWriteBufferDecomposesConditionalPuts(t *testing.T) {
1123
1127
1124
1128
testutils .RunTrueAndFalse (t , "condEvalSuccessful" , func (t * testing.T , condEvalSuccessful bool ) {
1125
1129
ctx := context .Background ()
1126
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1130
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1127
1131
twb .testingOverrideCPutEvalFn = func (expBytes []byte , actVal * roachpb.Value , actValPresent bool , allowNoExisting bool ) * kvpb.ConditionFailedError {
1128
1132
if condEvalSuccessful {
1129
1133
return nil
@@ -1203,7 +1207,7 @@ func TestTxnWriteBufferDecomposesConditionalPutsExpectingNoRow(t *testing.T) {
1203
1207
defer leaktest .AfterTest (t )()
1204
1208
defer log .Scope (t ).Close (t )
1205
1209
ctx := context .Background ()
1206
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1210
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1207
1211
1208
1212
twb .testingOverrideCPutEvalFn = func (expBytes []byte , actVal * roachpb.Value , actValPresent bool , allowNoExisting bool ) * kvpb.ConditionFailedError {
1209
1213
return nil
@@ -1265,7 +1269,7 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
1265
1269
defer leaktest .AfterTest (t )()
1266
1270
defer log .Scope (t ).Close (t )
1267
1271
ctx := context .Background ()
1268
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1272
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1269
1273
1270
1274
txn := makeTxnProto ()
1271
1275
txn .Sequence = 10
@@ -1338,7 +1342,7 @@ func TestTxnWriteBufferMustSortBatchesBySequenceNumber(t *testing.T) {
1338
1342
defer leaktest .AfterTest (t )()
1339
1343
defer log .Scope (t ).Close (t )
1340
1344
ctx := context .Background ()
1341
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1345
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1342
1346
1343
1347
txn := makeTxnProto ()
1344
1348
txn .Sequence = 10
@@ -1394,10 +1398,9 @@ func TestTxnWriteBufferMustSortBatchesBySequenceNumber(t *testing.T) {
1394
1398
func TestTxnWriteBufferEstimateSize (t * testing.T ) {
1395
1399
defer leaktest .AfterTest (t )()
1396
1400
defer log .Scope (t ).Close (t )
1397
- twb , _ := makeMockTxnWriteBuffer ( cluster . MakeClusterSettings () )
1401
+ ctx := context . Background ( )
1398
1402
1399
- st := cluster .MakeTestingClusterSettings ()
1400
- twb .st = st
1403
+ twb , _ , _ := makeMockTxnWriteBuffer (ctx )
1401
1404
1402
1405
txn := makeTxnProto ()
1403
1406
txn .Sequence = 10
@@ -1497,8 +1500,7 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
1497
1500
defer leaktest .AfterTest (t )()
1498
1501
defer log .Scope (t ).Close (t )
1499
1502
ctx := context .Background ()
1500
- st := cluster .MakeTestingClusterSettings ()
1501
- twb , mockSender := makeMockTxnWriteBuffer (st )
1503
+ twb , mockSender , st := makeMockTxnWriteBuffer (ctx )
1502
1504
1503
1505
txn := makeTxnProto ()
1504
1506
txn .Sequence = 10
@@ -1615,7 +1617,6 @@ func TestTxnWriteBufferLimitsSizeOfScans(t *testing.T) {
1615
1617
defer log .Scope (t ).Close (t )
1616
1618
1617
1619
ctx := context .Background ()
1618
- st := cluster .MakeTestingClusterSettings ()
1619
1620
1620
1621
keyA , keyB := roachpb .Key ("a" ), roachpb .Key ("b" )
1621
1622
@@ -1657,10 +1658,11 @@ func TestTxnWriteBufferLimitsSizeOfScans(t *testing.T) {
1657
1658
}
1658
1659
name := fmt .Sprintf ("%s/%s" , req , tc .name )
1659
1660
t .Run (name , func (t * testing.T ) {
1660
- twb , mockSender := makeMockTxnWriteBuffer (st )
1661
+ twb , mockSender , st := makeMockTxnWriteBuffer (ctx )
1661
1662
txn := makeTxnProto ()
1662
1663
txn .Sequence = 10
1663
1664
1665
+ bufferedWritesScanTransformEnabled .Override (ctx , & st .SV , true )
1664
1666
bufferedWritesMaxBufferSize .Override (ctx , & st .SV , tc .bufferSize )
1665
1667
1666
1668
ba := & kvpb.BatchRequest {Header : kvpb.Header {Txn : & txn }}
@@ -1695,7 +1697,7 @@ func TestTxnWriteBufferLimitsSizeOfScans(t *testing.T) {
1695
1697
}
1696
1698
}
1697
1699
t .Run ("no mutation when an unsupported request is in batch" , func (t * testing.T ) {
1698
- twb , mockSender := makeMockTxnWriteBuffer (st )
1700
+ twb , mockSender , st := makeMockTxnWriteBuffer (ctx )
1699
1701
txn := makeTxnProto ()
1700
1702
txn .Sequence = 10
1701
1703
@@ -1799,7 +1801,7 @@ func TestTxnWriteBufferFlushesIfBatchRequiresFlushing(t *testing.T) {
1799
1801
1800
1802
for _ , tc := range testCases {
1801
1803
t .Run (tc .name , func (t * testing.T ) {
1802
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1804
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1803
1805
1804
1806
txn := makeTxnProto ()
1805
1807
txn .Sequence = 10
@@ -1889,7 +1891,7 @@ func TestTxnWriteBufferRollbackToSavepoint(t *testing.T) {
1889
1891
defer leaktest .AfterTest (t )()
1890
1892
defer log .Scope (t ).Close (t )
1891
1893
ctx := context .Background ()
1892
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
1894
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
1893
1895
1894
1896
txn := makeTxnProto ()
1895
1897
txn .Sequence = 10
@@ -1997,7 +1999,7 @@ func TestRollbackNeverHeldLock(t *testing.T) {
1997
1999
defer leaktest .AfterTest (t )()
1998
2000
defer log .Scope (t ).Close (t )
1999
2001
ctx := context .Background ()
2000
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2002
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2001
2003
2002
2004
txn := makeTxnProto ()
2003
2005
txn .Sequence = 10
@@ -2037,7 +2039,7 @@ func TestTxnWriteBufferFlushesAfterDisabling(t *testing.T) {
2037
2039
defer leaktest .AfterTest (t )()
2038
2040
defer log .Scope (t ).Close (t )
2039
2041
ctx := context .Background ()
2040
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2042
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2041
2043
2042
2044
txn := makeTxnProto ()
2043
2045
txn .Sequence = 1
@@ -2145,7 +2147,7 @@ func TestTxnWriteBufferClearsBufferOnEpochBump(t *testing.T) {
2145
2147
defer leaktest .AfterTest (t )()
2146
2148
defer log .Scope (t ).Close (t )
2147
2149
ctx := context .Background ()
2148
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2150
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2149
2151
2150
2152
txn := makeTxnProto ()
2151
2153
txn .Sequence = 1
@@ -2179,7 +2181,7 @@ func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
2179
2181
defer leaktest .AfterTest (t )()
2180
2182
defer log .Scope (t ).Close (t )
2181
2183
ctx := context .Background ()
2182
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2184
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2183
2185
2184
2186
type testCase struct {
2185
2187
name string
@@ -2457,8 +2459,7 @@ func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
2457
2459
} {
2458
2460
t .Run (tc .name , func (t * testing.T ) {
2459
2461
ctx := context .Background ()
2460
- st := cluster .MakeTestingClusterSettings ()
2461
- twb , mockSender := makeMockTxnWriteBuffer (st )
2462
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2462
2463
2463
2464
if tc .setup != nil {
2464
2465
tc .setup (& twb )
@@ -2506,7 +2507,6 @@ func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
2506
2507
func BenchmarkTxnWriteBuffer (b * testing.B ) {
2507
2508
defer leaktest .AfterTest (b )()
2508
2509
ctx := context .Background ()
2509
- ct := cluster .MakeClusterSettings ()
2510
2510
metrics := MakeTxnMetrics (time .Hour )
2511
2511
2512
2512
// Map from kvSize to a slice of keys where the i-th element corresponds to
@@ -2546,7 +2546,7 @@ func BenchmarkTxnWriteBuffer(b *testing.B) {
2546
2546
}
2547
2547
}
2548
2548
makeBuffer := func (kvSize int , txn * roachpb.Transaction , numWrites int ) txnWriteBuffer {
2549
- twb , mockSender := makeMockTxnWriteBuffer (ct , metrics )
2549
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx , metrics )
2550
2550
sendFunc := func (ba * kvpb.BatchRequest ) (* kvpb.BatchResponse , * kvpb.Error ) {
2551
2551
br := ba .CreateReply ()
2552
2552
br .Txn = ba .Txn
@@ -2700,7 +2700,7 @@ func TestTxnWriteBufferChecksForExclusionLoss(t *testing.T) {
2700
2700
defer log .Scope (t ).Close (t )
2701
2701
2702
2702
ctx := context .Background ()
2703
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2703
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2704
2704
2705
2705
txn := makeTxnProto ()
2706
2706
txn .Sequence = 10
@@ -2804,7 +2804,7 @@ func TestTxnWriteBufferCorrectlyRollsbackExclusionTimestamp(t *testing.T) {
2804
2804
defer log .Scope (t ).Close (t )
2805
2805
2806
2806
ctx := context .Background ()
2807
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
2807
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
2808
2808
2809
2809
txn := makeTxnProto ()
2810
2810
txn .Sequence = 10
@@ -3242,7 +3242,7 @@ func TestTxnWriteBufferElidesUnnecessaryLockingRequests(t *testing.T) {
3242
3242
skip .WithIssue (t , 142977 , "%s requires a value but %s does not buffer its response" , firstReq .name , secondReq .name )
3243
3243
}
3244
3244
3245
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
3245
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
3246
3246
txn := makeTxnProto ()
3247
3247
txn .Sequence = 10
3248
3248
// Send first request and run firstRequest validation
@@ -3490,7 +3490,7 @@ func TestTxnWriteBufferLockingReadsTransformations(t *testing.T) {
3490
3490
3491
3491
for _ , tc := range testCases {
3492
3492
t .Run (tc .name , func (t * testing.T ) {
3493
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
3493
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
3494
3494
txn := makeTxnProto ()
3495
3495
txn .Sequence = 10
3496
3496
@@ -3712,7 +3712,7 @@ func TestTxnWriteBufferLockingGetFlushing(t *testing.T) {
3712
3712
for _ , tc := range testCases {
3713
3713
name := strings .Join (tc .ops , "_" )
3714
3714
t .Run (name , func (t * testing.T ) {
3715
- twb , mockSender := makeMockTxnWriteBuffer (cluster . MakeClusterSettings () )
3715
+ twb , mockSender , _ := makeMockTxnWriteBuffer (ctx )
3716
3716
txn := makeTxnProto ()
3717
3717
txn .Sequence = 10
3718
3718
mockSender .MockSend (func (ba * kvpb.BatchRequest ) (* kvpb.BatchResponse , * kvpb.Error ) {
0 commit comments