@@ -51,6 +51,13 @@ var bufferedWritesScanTransformEnabled = settings.RegisterBoolSetting(
51
51
metamorphic .ConstantWithTestBool ("kv.transaction.write_buffering.transformations.scans.enabled" , true /* defaultValue */ ),
52
52
)
53
53
54
+ var bufferedWritesGetTransformEnabled = settings .RegisterBoolSetting (
55
+ settings .ApplicationLevel ,
56
+ "kv.transaction.write_buffering.transformations.get.enabled" ,
57
+ "if enabled, locking get requests with replicated durability are transformed to unreplicated durability" ,
58
+ metamorphic .ConstantWithTestBool ("kv.transaction.write_buffering.transformations.get.enabled" , true /* defaultValue */ ),
59
+ )
60
+
54
61
const defaultBufferSize = 1 << 22 // 4MB
55
62
var bufferedWritesMaxBufferSize = settings .RegisterByteSizeSetting (
56
63
settings .ApplicationLevel ,
@@ -222,6 +229,11 @@ type txnWriteBuffer struct {
222
229
testingOverrideCPutEvalFn func (expBytes []byte , actVal * roachpb.Value , actValPresent bool , allowNoExisting bool ) * kvpb.ConditionFailedError
223
230
}
224
231
232
+ type transformConfig struct {
233
+ transformScans bool
234
+ transformGets bool
235
+ }
236
+
225
237
type pipelineEnabler interface {
226
238
enableImplicitPipelining ()
227
239
}
@@ -274,17 +286,20 @@ func (twb *txnWriteBuffer) SendLocked(
274
286
275
287
// We check if scan transforms are enabled once and use that answer until the
276
288
// end of SendLocked.
277
- transformScans := bufferedWritesScanTransformEnabled .Get (& twb .st .SV )
289
+ cfg := transformConfig {
290
+ transformScans : bufferedWritesScanTransformEnabled .Get (& twb .st .SV ),
291
+ transformGets : bufferedWritesGetTransformEnabled .Get (& twb .st .SV ),
292
+ }
278
293
279
- if twb .batchRequiresFlush (ctx , ba , transformScans ) {
294
+ if twb .batchRequiresFlush (ctx , ba , cfg ) {
280
295
return twb .flushBufferAndSendBatch (ctx , ba )
281
296
}
282
297
283
298
// Check if buffering writes from the supplied batch will run us over
284
299
// budget. If it will, we shouldn't buffer writes from the current batch,
285
300
// and flush the buffer.
286
301
maxSize := bufferedWritesMaxBufferSize .Get (& twb .st .SV )
287
- bufSize := twb .estimateSize (ba , transformScans ) + twb .bufferSize
302
+ bufSize := twb .estimateSize (ba , cfg ) + twb .bufferSize
288
303
289
304
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
290
305
// any buffer limiting.
@@ -302,7 +317,7 @@ func (twb *txnWriteBuffer) SendLocked(
302
317
return nil , kvpb .NewError (err )
303
318
}
304
319
305
- transformedBa , rr , pErr := twb .applyTransformations (ctx , ba , transformScans )
320
+ transformedBa , rr , pErr := twb .applyTransformations (ctx , ba , cfg )
306
321
if pErr != nil {
307
322
return nil , pErr
308
323
}
@@ -338,7 +353,7 @@ func (twb *txnWriteBuffer) SendLocked(
338
353
}
339
354
340
355
func (twb * txnWriteBuffer ) batchRequiresFlush (
341
- ctx context.Context , ba * kvpb.BatchRequest , transformScans bool ,
356
+ ctx context.Context , ba * kvpb.BatchRequest , _ transformConfig ,
342
357
) bool {
343
358
for _ , ru := range ba .Requests {
344
359
req := ru .GetInner ()
@@ -465,7 +480,7 @@ func unsupportedOptionError(m kvpb.Method, option string) error {
465
480
466
481
// estimateSize returns a conservative estimate by which the buffer will grow in
467
482
// size if the writes from the supplied batch request are buffered.
468
- func (twb * txnWriteBuffer ) estimateSize (ba * kvpb.BatchRequest , transformScans bool ) int64 {
483
+ func (twb * txnWriteBuffer ) estimateSize (ba * kvpb.BatchRequest , cfg transformConfig ) int64 {
469
484
var scratch bufferedWrite
470
485
scratch .vals = scratch .valsScratch [:1 ]
471
486
estimate := int64 (0 )
@@ -485,7 +500,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
485
500
estimate += scratch .size ()
486
501
estimate += lockKeyInfoSize
487
502
case * kvpb.GetRequest :
488
- if IsReplicatedLockingRequest (t ) {
503
+ if IsReplicatedLockingRequest (t ) && cfg . transformGets {
489
504
scratch .key = t .Key
490
505
estimate += scratch .size ()
491
506
estimate += lockKeyInfoSize
@@ -522,7 +537,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
522
537
// the buffer. Here, we assume at least 1 key will be returned that is
523
538
// about the size of the scan start boundary. We try to protect from large
524
539
// buffer overflows by transforming the batch's MaxSpanRequestKeys later.
525
- if IsReplicatedLockingRequest (t ) && transformScans {
540
+ if IsReplicatedLockingRequest (t ) && cfg . transformScans {
526
541
scratch .key = t .Key
527
542
scratch .vals [0 ] = bufferedValue {
528
543
seq : t .Sequence ,
@@ -531,7 +546,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
531
546
}
532
547
case * kvpb.ReverseScanRequest :
533
548
// See the comment on the ScanRequest case for more details.
534
- if IsReplicatedLockingRequest (t ) && transformScans {
549
+ if IsReplicatedLockingRequest (t ) && cfg . transformScans {
535
550
scratch .key = t .Key
536
551
scratch .vals [0 ] = bufferedValue {
537
552
seq : t .Sequence ,
@@ -830,7 +845,7 @@ func (twb *txnWriteBuffer) closeLocked() {}
830
845
// locking Get request if it can be served from the buffer (i.e if a lock of
831
846
// sufficient strength has been acquired and a value has been buffered).
832
847
func (twb * txnWriteBuffer ) applyTransformations (
833
- ctx context.Context , ba * kvpb.BatchRequest , transformScans bool ,
848
+ ctx context.Context , ba * kvpb.BatchRequest , cfg transformConfig ,
834
849
) (* kvpb.BatchRequest , requestRecords , * kvpb.Error ) {
835
850
baRemote := ba .ShallowCopy ()
836
851
// TODO(arul): We could improve performance here by pre-allocating
@@ -939,7 +954,7 @@ func (twb *txnWriteBuffer) applyTransformations(
939
954
requiresLockTransform := IsReplicatedLockingRequest (t )
940
955
requestRequired := requiresAdditionalLocking || ! served
941
956
942
- if requestRequired && requiresLockTransform {
957
+ if requestRequired && requiresLockTransform && cfg . transformGets {
943
958
var getReqU kvpb.RequestUnion
944
959
getReq := t .ShallowCopy ().(* kvpb.GetRequest )
945
960
getReq .KeyLockingDurability = lock .Unreplicated
@@ -957,7 +972,7 @@ func (twb *txnWriteBuffer) applyTransformations(
957
972
// Regardless of whether the scan overlaps with any writes in the buffer
958
973
// or not, we must send the request to the KV layer. We can't know for
959
974
// sure that there's nothing else to read.
960
- if IsReplicatedLockingRequest (t ) && transformScans {
975
+ if IsReplicatedLockingRequest (t ) && cfg . transformScans {
961
976
var scanReqU kvpb.RequestUnion
962
977
scanReq := t .ShallowCopy ().(* kvpb.ScanRequest )
963
978
scanReq .KeyLockingDurability = lock .Unreplicated
@@ -975,7 +990,7 @@ func (twb *txnWriteBuffer) applyTransformations(
975
990
// Regardless of whether the reverse scan overlaps with any writes in the
976
991
// buffer or not, we must send the request to the KV layer. We can't know
977
992
// for sure that there's nothing else to read.
978
- if IsReplicatedLockingRequest (t ) && transformScans {
993
+ if IsReplicatedLockingRequest (t ) && cfg . transformScans {
979
994
var rScanReqU kvpb.RequestUnion
980
995
rScanReq := t .ShallowCopy ().(* kvpb.ReverseScanRequest )
981
996
rScanReq .KeyLockingDurability = lock .Unreplicated
0 commit comments