Skip to content

Commit c2fc2b8

Browse files
committed
kvcoord: make GetRequest transforms configurable
Making this metamorphic might help us discover other edge cases. Also, it gives us the option to flip this off by default on 25.3 if we choose to. Release note: None Epic: none
1 parent 48c636e commit c2fc2b8

File tree

2 files changed

+46
-24
lines changed

2 files changed

+46
-24
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ var bufferedWritesScanTransformEnabled = settings.RegisterBoolSetting(
5151
metamorphic.ConstantWithTestBool("kv.transaction.write_buffering.transformations.scans.enabled", true /* defaultValue */),
5252
)
5353

54+
var bufferedWritesGetTransformEnabled = settings.RegisterBoolSetting(
55+
settings.ApplicationLevel,
56+
"kv.transaction.write_buffering.transformations.get.enabled",
57+
"if enabled, locking get requests with replicated durability are transformed to unreplicated durability",
58+
metamorphic.ConstantWithTestBool("kv.transaction.write_buffering.transformations.get.enabled", true /* defaultValue */),
59+
)
60+
5461
const defaultBufferSize = 1 << 22 // 4MB
5562
var bufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
5663
settings.ApplicationLevel,
@@ -222,6 +229,11 @@ type txnWriteBuffer struct {
222229
testingOverrideCPutEvalFn func(expBytes []byte, actVal *roachpb.Value, actValPresent bool, allowNoExisting bool) *kvpb.ConditionFailedError
223230
}
224231

232+
type transformConfig struct {
233+
transformScans bool
234+
transformGets bool
235+
}
236+
225237
type pipelineEnabler interface {
226238
enableImplicitPipelining()
227239
}
@@ -274,17 +286,20 @@ func (twb *txnWriteBuffer) SendLocked(
274286

275287
// We check if scan transforms are enabled once and use that answer until the
276288
// end of SendLocked.
277-
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
289+
cfg := transformConfig{
290+
transformScans: bufferedWritesScanTransformEnabled.Get(&twb.st.SV),
291+
transformGets: bufferedWritesGetTransformEnabled.Get(&twb.st.SV),
292+
}
278293

279-
if twb.batchRequiresFlush(ctx, ba, transformScans) {
294+
if twb.batchRequiresFlush(ctx, ba, cfg) {
280295
return twb.flushBufferAndSendBatch(ctx, ba)
281296
}
282297

283298
// Check if buffering writes from the supplied batch will run us over
284299
// budget. If it will, we shouldn't buffer writes from the current batch,
285300
// and flush the buffer.
286301
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
287-
bufSize := twb.estimateSize(ba, transformScans) + twb.bufferSize
302+
bufSize := twb.estimateSize(ba, cfg) + twb.bufferSize
288303

289304
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
290305
// any buffer limiting.
@@ -302,7 +317,7 @@ func (twb *txnWriteBuffer) SendLocked(
302317
return nil, kvpb.NewError(err)
303318
}
304319

305-
transformedBa, rr, pErr := twb.applyTransformations(ctx, ba, transformScans)
320+
transformedBa, rr, pErr := twb.applyTransformations(ctx, ba, cfg)
306321
if pErr != nil {
307322
return nil, pErr
308323
}
@@ -338,7 +353,7 @@ func (twb *txnWriteBuffer) SendLocked(
338353
}
339354

340355
func (twb *txnWriteBuffer) batchRequiresFlush(
341-
ctx context.Context, ba *kvpb.BatchRequest, transformScans bool,
356+
ctx context.Context, ba *kvpb.BatchRequest, _ transformConfig,
342357
) bool {
343358
for _, ru := range ba.Requests {
344359
req := ru.GetInner()
@@ -465,7 +480,7 @@ func unsupportedOptionError(m kvpb.Method, option string) error {
465480

466481
// estimateSize returns a conservative estimate by which the buffer will grow in
467482
// size if the writes from the supplied batch request are buffered.
468-
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bool) int64 {
483+
func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, cfg transformConfig) int64 {
469484
var scratch bufferedWrite
470485
scratch.vals = scratch.valsScratch[:1]
471486
estimate := int64(0)
@@ -485,7 +500,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
485500
estimate += scratch.size()
486501
estimate += lockKeyInfoSize
487502
case *kvpb.GetRequest:
488-
if IsReplicatedLockingRequest(t) {
503+
if IsReplicatedLockingRequest(t) && cfg.transformGets {
489504
scratch.key = t.Key
490505
estimate += scratch.size()
491506
estimate += lockKeyInfoSize
@@ -522,7 +537,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
522537
// the buffer. Here, we assume at least 1 key will be returned that is
523538
// about the size of the scan start boundary. We try to protect from large
524539
// buffer overflows by transforming the batch's MaxSpanRequestKeys later.
525-
if IsReplicatedLockingRequest(t) && transformScans {
540+
if IsReplicatedLockingRequest(t) && cfg.transformScans {
526541
scratch.key = t.Key
527542
scratch.vals[0] = bufferedValue{
528543
seq: t.Sequence,
@@ -531,7 +546,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
531546
}
532547
case *kvpb.ReverseScanRequest:
533548
// See the comment on the ScanRequest case for more details.
534-
if IsReplicatedLockingRequest(t) && transformScans {
549+
if IsReplicatedLockingRequest(t) && cfg.transformScans {
535550
scratch.key = t.Key
536551
scratch.vals[0] = bufferedValue{
537552
seq: t.Sequence,
@@ -830,7 +845,7 @@ func (twb *txnWriteBuffer) closeLocked() {}
830845
// locking Get request if it can be served from the buffer (i.e if a lock of
831846
// sufficient strength has been acquired and a value has been buffered).
832847
func (twb *txnWriteBuffer) applyTransformations(
833-
ctx context.Context, ba *kvpb.BatchRequest, transformScans bool,
848+
ctx context.Context, ba *kvpb.BatchRequest, cfg transformConfig,
834849
) (*kvpb.BatchRequest, requestRecords, *kvpb.Error) {
835850
baRemote := ba.ShallowCopy()
836851
// TODO(arul): We could improve performance here by pre-allocating
@@ -939,7 +954,7 @@ func (twb *txnWriteBuffer) applyTransformations(
939954
requiresLockTransform := IsReplicatedLockingRequest(t)
940955
requestRequired := requiresAdditionalLocking || !served
941956

942-
if requestRequired && requiresLockTransform {
957+
if requestRequired && requiresLockTransform && cfg.transformGets {
943958
var getReqU kvpb.RequestUnion
944959
getReq := t.ShallowCopy().(*kvpb.GetRequest)
945960
getReq.KeyLockingDurability = lock.Unreplicated
@@ -957,7 +972,7 @@ func (twb *txnWriteBuffer) applyTransformations(
957972
// Regardless of whether the scan overlaps with any writes in the buffer
958973
// or not, we must send the request to the KV layer. We can't know for
959974
// sure that there's nothing else to read.
960-
if IsReplicatedLockingRequest(t) && transformScans {
975+
if IsReplicatedLockingRequest(t) && cfg.transformScans {
961976
var scanReqU kvpb.RequestUnion
962977
scanReq := t.ShallowCopy().(*kvpb.ScanRequest)
963978
scanReq.KeyLockingDurability = lock.Unreplicated
@@ -975,7 +990,7 @@ func (twb *txnWriteBuffer) applyTransformations(
975990
// Regardless of whether the reverse scan overlaps with any writes in the
976991
// buffer or not, we must send the request to the KV layer. We can't know
977992
// for sure that there's nothing else to read.
978-
if IsReplicatedLockingRequest(t) && transformScans {
993+
if IsReplicatedLockingRequest(t) && cfg.transformScans {
979994
var rScanReqU kvpb.RequestUnion
980995
rScanReq := t.ShallowCopy().(*kvpb.ReverseScanRequest)
981996
rScanReq.KeyLockingDurability = lock.Unreplicated

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func makeMockTxnWriteBuffer(
3131
) (txnWriteBuffer, *mockLockedSender, *cluster.Settings) {
3232
st := cluster.MakeClusterSettings()
3333
bufferedWritesScanTransformEnabled.Override(ctx, &st.SV, true)
34+
bufferedWritesGetTransformEnabled.Override(ctx, &st.SV, true)
3435
bufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
3536

3637
var metrics TxnMetrics
@@ -1406,6 +1407,12 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14061407
txn.Sequence = 10
14071408
keyA := roachpb.Key("a")
14081409
keyB := roachpb.Key("b")
1410+
cfg := transformConfig{
1411+
transformScans: true,
1412+
transformGets: true,
1413+
}
1414+
noScanCfg := cfg
1415+
noScanCfg.transformScans = false
14091416

14101417
valAStr := "valA"
14111418
valA := roachpb.MakeValueFromString(valAStr)
@@ -1419,23 +1426,23 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14191426
ba.Add(putA)
14201427

14211428
expectedUnlockedPutSize := int64(len(keyA)+len(valA.RawBytes)) + bufferedWriteStructOverhead + bufferedValueStructOverhead
1422-
require.Equal(t, expectedUnlockedPutSize, twb.estimateSize(ba, true))
1429+
require.Equal(t, expectedUnlockedPutSize, twb.estimateSize(ba, cfg))
14231430

14241431
ba = &kvpb.BatchRequest{}
14251432
ba.Header = kvpb.Header{Txn: &txn}
14261433
putA = putArgs(keyA, valAStr, txn.Sequence)
14271434
putA.MustAcquireExclusiveLock = true
14281435
ba.Add(putA)
14291436

1430-
require.Equal(t, expectedUnlockedPutSize+lockKeyInfoSize, twb.estimateSize(ba, true))
1437+
require.Equal(t, expectedUnlockedPutSize+lockKeyInfoSize, twb.estimateSize(ba, cfg))
14311438

14321439
ba = &kvpb.BatchRequest{}
14331440
cputLarge := cputArgs(keyLarge, valLargeStr, "", txn.Sequence)
14341441
ba.Add(cputLarge)
14351442

14361443
require.Equal(t,
14371444
int64(len(keyLarge)+len(valLarge.RawBytes))+bufferedWriteStructOverhead+bufferedValueStructOverhead+lockKeyInfoSize,
1438-
twb.estimateSize(ba, true),
1445+
twb.estimateSize(ba, cfg),
14391446
)
14401447

14411448
ba = &kvpb.BatchRequest{}
@@ -1445,14 +1452,14 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14451452
// NB: note that we're overcounting here, as we're deleting a key that's
14461453
// already present in the buffer. But that's what estimating is about.
14471454
expectedUnlockedDelSize := int64(len(keyA)) + bufferedWriteStructOverhead + bufferedValueStructOverhead
1448-
require.Equal(t, expectedUnlockedDelSize, twb.estimateSize(ba, true))
1455+
require.Equal(t, expectedUnlockedDelSize, twb.estimateSize(ba, cfg))
14491456

14501457
ba = &kvpb.BatchRequest{}
14511458
delA = delArgs(keyA, txn.Sequence)
14521459
delA.MustAcquireExclusiveLock = true
14531460
ba.Add(delA)
14541461

1455-
require.Equal(t, expectedUnlockedDelSize+lockKeyInfoSize, twb.estimateSize(ba, true))
1462+
require.Equal(t, expectedUnlockedDelSize+lockKeyInfoSize, twb.estimateSize(ba, cfg))
14561463

14571464
ba = &kvpb.BatchRequest{}
14581465
ba.Add(&kvpb.ScanRequest{
@@ -1462,8 +1469,8 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14621469
})
14631470

14641471
expectedLockedScanSize := int64(len(keyA)) + bufferedWriteStructOverhead + bufferedValueStructOverhead + lockKeyInfoSize
1465-
require.Equal(t, expectedLockedScanSize, twb.estimateSize(ba, true))
1466-
require.Equal(t, int64(0), twb.estimateSize(ba, false))
1472+
require.Equal(t, expectedLockedScanSize, twb.estimateSize(ba, cfg))
1473+
require.Equal(t, int64(0), twb.estimateSize(ba, noScanCfg))
14671474

14681475
ba = &kvpb.BatchRequest{}
14691476
ba.Add(&kvpb.ReverseScanRequest{
@@ -1473,24 +1480,24 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14731480
})
14741481

14751482
expectedLockedRScanSize := int64(len(keyA)) + bufferedWriteStructOverhead + bufferedValueStructOverhead + lockKeyInfoSize
1476-
require.Equal(t, expectedLockedRScanSize, twb.estimateSize(ba, true))
1477-
require.Equal(t, int64(0), twb.estimateSize(ba, false))
1483+
require.Equal(t, expectedLockedRScanSize, twb.estimateSize(ba, cfg))
1484+
require.Equal(t, int64(0), twb.estimateSize(ba, noScanCfg))
14781485

14791486
ba = &kvpb.BatchRequest{}
14801487
ba.Add(&kvpb.ScanRequest{
14811488
KeyLockingStrength: lock.Exclusive,
14821489
KeyLockingDurability: lock.Unreplicated,
14831490
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB, Sequence: txn.Sequence},
14841491
})
1485-
require.Equal(t, int64(0), twb.estimateSize(ba, true))
1492+
require.Equal(t, int64(0), twb.estimateSize(ba, cfg))
14861493

14871494
ba = &kvpb.BatchRequest{}
14881495
ba.Add(&kvpb.ReverseScanRequest{
14891496
KeyLockingStrength: lock.Exclusive,
14901497
KeyLockingDurability: lock.Unreplicated,
14911498
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB, Sequence: txn.Sequence},
14921499
})
1493-
require.Equal(t, int64(0), twb.estimateSize(ba, true))
1500+
require.Equal(t, int64(0), twb.estimateSize(ba, cfg))
14941501
}
14951502

14961503
// TestTxnWriteBufferFlushesWhenOverBudget verifies that the txnWriteBuffer

0 commit comments

Comments
 (0)