@@ -44,6 +44,13 @@ var BufferedWritesEnabled = settings.RegisterBoolSetting(
44
44
settings .WithPublic ,
45
45
)
46
46
47
+ var bufferedWritesScanTransformEnabled = settings .RegisterBoolSetting (
48
+ settings .ApplicationLevel ,
49
+ "kv.transaction.write_buffering.transformations.scans.enabled" ,
50
+ "if enabled, locking scans and reverse scans with replicated durability are transformed to unreplicated durability" ,
51
+ true ,
52
+ )
53
+
47
54
var bufferedWritesMaxBufferSize = settings .RegisterByteSizeSetting (
48
55
settings .ApplicationLevel ,
49
56
"kv.transaction.write_buffering.max_buffer_size" ,
@@ -251,7 +258,11 @@ func (twb *txnWriteBuffer) SendLocked(
251
258
// budget. If it will, we shouldn't buffer writes from the current batch,
252
259
// and flush the buffer.
253
260
maxSize := bufferedWritesMaxBufferSize .Get (& twb .st .SV )
254
- bufSize := twb .estimateSize (ba ) + twb .bufferSize
261
+ // We check if scan transforms are enabled once and use that answer until the
262
+ // end of SendLocked.
263
+ transformScans := bufferedWritesScanTransformEnabled .Get (& twb .st .SV )
264
+ bufSize := twb .estimateSize (ba , transformScans ) + twb .bufferSize
265
+
255
266
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
256
267
// any buffer limiting.
257
268
if maxSize != 0 && bufSize > maxSize {
@@ -268,7 +279,7 @@ func (twb *txnWriteBuffer) SendLocked(
268
279
return nil , kvpb .NewError (err )
269
280
}
270
281
271
- transformedBa , rr , pErr := twb .applyTransformations (ctx , ba )
282
+ transformedBa , rr , pErr := twb .applyTransformations (ctx , ba , transformScans )
272
283
if pErr != nil {
273
284
return nil , pErr
274
285
}
@@ -429,7 +440,7 @@ func unsupportedOptionError(m kvpb.Method, option string) error {
429
440
430
441
// estimateSize returns a conservative estimate by which the buffer will grow in
431
442
// size if the writes from the supplied batch request are buffered.
432
- func (twb * txnWriteBuffer ) estimateSize (ba * kvpb.BatchRequest ) int64 {
443
+ func (twb * txnWriteBuffer ) estimateSize (ba * kvpb.BatchRequest , transformScans bool ) int64 {
433
444
var scratch bufferedWrite
434
445
estimate := int64 (0 )
435
446
scratch .vals = make ([]bufferedValue , 1 )
@@ -478,7 +489,36 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
478
489
if t .MustAcquireExclusiveLock {
479
490
estimate += lockKeyInfoSize
480
491
}
492
+ case * kvpb.ScanRequest :
493
+ // ScanRequest can potentially consume up to t.TargetBytes (or an
494
+ // unbounded number of bytes if TargetBytes is 0). When set, TargetBytes
495
+ // will typically be set much larger than the default buffer size, so if
496
+ // we were to estimate the size based on TargetBytes we would always flush
497
+ // the buffer. Here, we assume at least 1 key will be returned that is
498
+ // about the size of the scan start boundary. We try to protect from large
499
+ // buffer overflows by transforming the batch's MaxSpanRequestKeys later.
500
+ shouldTransform := t .KeyLockingStrength > lock .None && t .KeyLockingDurability == lock .Replicated
501
+ shouldTransform = shouldTransform && transformScans
502
+ if shouldTransform {
503
+ scratch .key = t .Key
504
+ scratch .vals [0 ] = bufferedValue {
505
+ seq : t .Sequence ,
506
+ }
507
+ estimate += scratch .size () + lockKeyInfoSize
508
+ }
509
+ case * kvpb.ReverseScanRequest :
510
+ // See the comment on the ScanRequest case for more details.
511
+ shouldTransform := t .KeyLockingStrength > lock .None && t .KeyLockingDurability == lock .Replicated
512
+ shouldTransform = shouldTransform && transformScans
513
+ if shouldTransform {
514
+ scratch .key = t .Key
515
+ scratch .vals [0 ] = bufferedValue {
516
+ seq : t .Sequence ,
517
+ }
518
+ estimate += scratch .size () + lockKeyInfoSize
519
+ }
481
520
}
521
+
482
522
// No other request is buffered.
483
523
}
484
524
return estimate
@@ -751,22 +791,26 @@ func (twb *txnWriteBuffer) closeLocked() {}
751
791
//
752
792
// 4. Scans are always sent to the KV layer, but if the key span being scanned
753
793
// overlaps with any buffered writes, then the response from the KV layer needs
754
- // to be merged with buffered writes. These are collected as requestRecords.
794
+ // to be merged with buffered writes. These are collected as requestRecords. If
795
+ // the Scan is a locking scan with a replicated durability, it is transformed to
796
+ // an unreplicated durability and information about the lock is added to the
797
+ // buffer.
755
798
//
756
799
// 5. ReverseScans, similar to scans, are also always sent to the KV layer and
757
800
// their response needs to be merged with any buffered writes. The only
758
801
// difference is the direction in which the buffer is iterated when doing the
759
- // merge. As a result, they're also collected as requestRecords.
802
+ // merge. As a result, they're also collected as requestRecords. If the
803
+ // ReverseScan is a locking scan with a replicated durability, it is transformed
804
+ // to an unreplicated durability and information about the lock is added to the
805
+ // buffer.
760
806
//
761
807
// 6. Conditional Puts are decomposed into a locking Get followed by a Put. The
762
808
// Put is buffered locally if the condition evaluates successfully using the
763
809
// Get's response. Otherwise, a ConditionFailedError is returned. We elide the
764
810
// locking Get request if it can be served from the buffer (i.e if a lock of
765
811
// sufficient strength has been acquired and a value has been buffered).
766
- //
767
- // TODO(arul): Augment this comment as these expand.
768
812
func (twb * txnWriteBuffer ) applyTransformations (
769
- ctx context.Context , ba * kvpb.BatchRequest ,
813
+ ctx context.Context , ba * kvpb.BatchRequest , transformScans bool ,
770
814
) (* kvpb.BatchRequest , requestRecords , * kvpb.Error ) {
771
815
baRemote := ba .ShallowCopy ()
772
816
// TODO(arul): We could improve performance here by pre-allocating
@@ -776,6 +820,8 @@ func (twb *txnWriteBuffer) applyTransformations(
776
820
baRemote .Requests = nil
777
821
778
822
rr := make (requestRecords , 0 , len (ba .Requests ))
823
+ hasTransformedLockingScan := false
824
+ transformedLockingScanKeySizeEstimate := 0
779
825
for i , ru := range ba .Requests {
780
826
req := ru .GetInner ()
781
827
// Track a requestRecord for the request regardless of the type, and
@@ -887,11 +933,45 @@ func (twb *txnWriteBuffer) applyTransformations(
887
933
baRemote .Requests = append (baRemote .Requests , ru )
888
934
}
889
935
890
- case * kvpb.ScanRequest , * kvpb. ReverseScanRequest :
936
+ case * kvpb.ScanRequest :
891
937
// Regardless of whether the scan overlaps with any writes in the buffer
892
938
// or not, we must send the request to the KV layer. We can't know for
893
939
// sure that there's nothing else to read.
894
- baRemote .Requests = append (baRemote .Requests , ru )
940
+ shouldTransform := t .KeyLockingStrength > lock .None && t .KeyLockingDurability == lock .Replicated
941
+ shouldTransform = shouldTransform && transformScans
942
+ if shouldTransform {
943
+ var scanReqU kvpb.RequestUnion
944
+ scanReq := t .ShallowCopy ().(* kvpb.ScanRequest )
945
+ scanReq .KeyLockingDurability = lock .Unreplicated
946
+ scanReqU .MustSetInner (scanReq )
947
+
948
+ baRemote .Requests = append (baRemote .Requests , scanReqU )
949
+ record .transformed = true
950
+ hasTransformedLockingScan = true
951
+ transformedLockingScanKeySizeEstimate = max (transformedLockingScanKeySizeEstimate , len (scanReq .Key ))
952
+ } else {
953
+ baRemote .Requests = append (baRemote .Requests , ru )
954
+ }
955
+
956
+ case * kvpb.ReverseScanRequest :
957
+ // Regardless of whether the reverse scan overlaps with any writes in the
958
+ // buffer or not, we must send the request to the KV layer. We can't know
959
+ // for sure that there's nothing else to read.
960
+ shouldTransform := t .KeyLockingStrength > lock .None && t .KeyLockingDurability == lock .Replicated
961
+ shouldTransform = shouldTransform && transformScans
962
+ if shouldTransform {
963
+ var rScanReqU kvpb.RequestUnion
964
+ rScanReq := t .ShallowCopy ().(* kvpb.ReverseScanRequest )
965
+ rScanReq .KeyLockingDurability = lock .Unreplicated
966
+ rScanReqU .MustSetInner (rScanReq )
967
+
968
+ baRemote .Requests = append (baRemote .Requests , rScanReqU )
969
+ hasTransformedLockingScan = true
970
+ transformedLockingScanKeySizeEstimate = max (transformedLockingScanKeySizeEstimate , len (rScanReq .Key ))
971
+ record .transformed = true
972
+ } else {
973
+ baRemote .Requests = append (baRemote .Requests , ru )
974
+ }
895
975
896
976
case * kvpb.QueryLocksRequest , * kvpb.LeaseInfoRequest :
897
977
// These requests don't interact with buffered writes, so we simply
@@ -903,9 +983,84 @@ func (twb *txnWriteBuffer) applyTransformations(
903
983
}
904
984
rr = append (rr , record )
905
985
}
986
+
987
+ if hasTransformedLockingScan {
988
+ twb .maybeMutateBatchMaxSpanRequestKeys (ctx , baRemote , transformedLockingScanKeySizeEstimate )
989
+ }
990
+
906
991
return baRemote , rr , nil
907
992
}
908
993
994
+ // maybeMutateBatchMaxSpanRequestKeys limits MaxSpanRequestKeys to protect from the
995
+ // need to buffer an unbounded number of keys.
996
+ //
997
+ // SQL sets TargetBytes to 0, allowing for parallel scans, when it believes that
998
+ // the constraints on the scan limits the result set to less than 10,000 rows.
999
+ // This is a large number of rows, but, in order to preserve the ability for
1000
+ // parallel scans, we don't limit the request keys if TargetBytes is 0.
1001
+ //
1002
+ // NB: This does not currently take into account the fact that other requests in
1003
+ // the buffer may also consume some of the remaining buffer size. That is
1004
+ // probably OK since SQL-generated transactions won't have reads and writes in
1005
+ // the same batch.
1006
+ func (twb * txnWriteBuffer ) maybeMutateBatchMaxSpanRequestKeys (
1007
+ ctx context.Context , ba * kvpb.BatchRequest , transformedLockingScanKeySizeEstimate int ,
1008
+ ) {
1009
+ if ba .TargetBytes == 0 && ba .MaxSpanRequestKeys == 0 {
1010
+ log .VEventf (ctx , 2 , "allowing unbounded transformed locking scan because TargetBytes=0 and MaxSpanRequestKeys=0" )
1011
+ return
1012
+ }
1013
+
1014
+ // If the user has disabled a maximum buffer size, respect that.
1015
+ maxSize := bufferedWritesMaxBufferSize .Get (& twb .st .SV )
1016
+ if maxSize == 0 {
1017
+ log .VEventf (ctx , 2 , "allowing unbounded transformed locking scan because %s=0" , bufferedWritesMaxBufferSize .Name ())
1018
+ return
1019
+ }
1020
+
1021
+ // According to the documentation, MaxSpanRequestKeys is not supported for all
1022
+ // requests. Here we check against all requests that are allowed through
1023
+ // (*txnWriteBuffer).validateRequests, but we should not see most of these
1024
+ // requests in a transformed batch. Further, SQL does not generate batches
1025
+ // that include both reads and writes.
1026
+ for _ , ru := range ba .Requests {
1027
+ req := ru .GetInner ()
1028
+ switch req .(type ) {
1029
+ case * kvpb.ConditionalPutRequest , * kvpb.DeleteRequest ,
1030
+ * kvpb.PutRequest , * kvpb.LeaseInfoRequest :
1031
+ log .VEventf (ctx , 2 , "allowing unbounded transformed locking scan because transformed batch contains a %s request" ,
1032
+ req .Method ())
1033
+ return
1034
+ case * kvpb.GetRequest , * kvpb.ScanRequest , * kvpb.ReverseScanRequest , * kvpb.QueryLocksRequest :
1035
+ continue
1036
+ default :
1037
+ log .VEventf (ctx , 2 , "allowing unbounded transformed locking scan because transformed batch contains an unexpected %s request" ,
1038
+ req .Method ())
1039
+ return
1040
+ }
1041
+ }
1042
+
1043
+ var bufferRemaining int64
1044
+ if twb .bufferSize > maxSize {
1045
+ // Somehow the bufferSize has already grown beyond the max, perhaps because
1046
+ // the setting has changed. We could bail out and flush our batch, but for
1047
+ // simplicity we keep going.
1048
+ const fallbackTarget = 1 << 20 // 1 MB
1049
+ bufferRemaining = min (fallbackTarget , maxSize )
1050
+ } else {
1051
+ bufferRemaining = maxSize - twb .bufferSize
1052
+ }
1053
+
1054
+ perKeyEstimate := lockKeyInfoSize + int64 (transformedLockingScanKeySizeEstimate )
1055
+ targetKeys := max (bufferRemaining / perKeyEstimate , 1 )
1056
+ if ba .MaxSpanRequestKeys == 0 || ba .MaxSpanRequestKeys > targetKeys {
1057
+ log .VEventf (ctx , 2 , "changing MaxSpanRequestKeys from %d to %d because of a transformed locking scan" ,
1058
+ ba .MaxSpanRequestKeys ,
1059
+ targetKeys )
1060
+ ba .MaxSpanRequestKeys = targetKeys
1061
+ }
1062
+ }
1063
+
909
1064
// seekItemForSpan returns a bufferedWrite appropriate for use with a
910
1065
// write-buffer iterator. Point lookups should use a nil end key.
911
1066
func (twb * txnWriteBuffer ) seekItemForSpan (key , endKey roachpb.Key ) * bufferedWrite {
@@ -1047,6 +1202,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
1047
1202
// larger of the two keys.
1048
1203
cmp = cmp * - 1
1049
1204
}
1205
+
1050
1206
switch cmp {
1051
1207
case - 1 :
1052
1208
// The key in the buffer is less than the next key in the server's
@@ -1313,18 +1469,42 @@ func (rr requestRecord) toResp(
1313
1469
}
1314
1470
1315
1471
case * kvpb.ScanRequest :
1316
- scanResp , err := twb .mergeWithScanResp (
1317
- rr .origRequest .(* kvpb.ScanRequest ), br .GetInner ().(* kvpb.ScanResponse ),
1318
- )
1472
+ origReq := rr .origRequest .(* kvpb.ScanRequest )
1473
+ resp := br .GetInner ().(* kvpb.ScanResponse )
1474
+ if rr .transformed {
1475
+ // We iterate over the ScanResponse here since we cannot mutate the write
1476
+ // buffer while iterating over it.
1477
+ respIter := newScanRespIter (origReq , resp )
1478
+ for ; respIter .valid (); respIter .next () {
1479
+ twb .addDurableLockedReadToBuffer (respIter .peekKey (), & bufferedDurableLockAcquisition {
1480
+ str : req .KeyLockingStrength ,
1481
+ seq : req .Sequence ,
1482
+ ts : txn .ReadTimestamp ,
1483
+ })
1484
+ }
1485
+ }
1486
+ scanResp , err := twb .mergeWithScanResp (origReq , resp )
1319
1487
if err != nil {
1320
1488
return kvpb.ResponseUnion {}, kvpb .NewError (err )
1321
1489
}
1322
1490
ru .MustSetInner (scanResp )
1323
1491
1324
1492
case * kvpb.ReverseScanRequest :
1325
- reverseScanResp , err := twb .mergeWithReverseScanResp (
1326
- rr .origRequest .(* kvpb.ReverseScanRequest ), br .GetInner ().(* kvpb.ReverseScanResponse ),
1327
- )
1493
+ origReq := rr .origRequest .(* kvpb.ReverseScanRequest )
1494
+ resp := br .GetInner ().(* kvpb.ReverseScanResponse )
1495
+ if rr .transformed {
1496
+ // We iterate over the ReverseScanResponse here since we cannot mutate the
1497
+ // write buffer while iterating over it.
1498
+ respIter := newReverseScanRespIter (origReq , resp )
1499
+ for ; respIter .valid (); respIter .next () {
1500
+ twb .addDurableLockedReadToBuffer (respIter .peekKey (), & bufferedDurableLockAcquisition {
1501
+ str : req .KeyLockingStrength ,
1502
+ seq : req .Sequence ,
1503
+ ts : txn .ReadTimestamp ,
1504
+ })
1505
+ }
1506
+ }
1507
+ reverseScanResp , err := twb .mergeWithReverseScanResp (origReq , resp )
1328
1508
if err != nil {
1329
1509
return kvpb.ResponseUnion {}, kvpb .NewError (err )
1330
1510
}
@@ -2174,6 +2354,8 @@ func newReverseScanRespIter(
2174
2354
//
2175
2355
// peekKey should only be called if the iterator is in valid state (i.e.
2176
2356
// valid() returned true).
2357
+ //
2358
+ // NB: Callers assume that the returned key can be retained.
2177
2359
func (s * respIter ) peekKey () roachpb.Key {
2178
2360
if s .scanFormat == kvpb .KEY_VALUES {
2179
2361
return s .rows [s .rowsIndex ].Key
0 commit comments