Skip to content

Commit 5949c6b

Browse files
craig[bot]stevendanna
andcommitted
Merge #150404
150404: kvcoord: common changes for multiple branches of in-flight work r=miraradeva a=stevendanna See the individual commits for details.. I'd like to merge this and backport it to 25.3 separately to make it easier to it a bit easier to manage this series of changes. Epic: None Release note: None Release justification: Minor changes and test changes we are making in advance of more critical bug fixes. All changes are to a settings-gated, off-by-default preview feature. Co-authored-by: Steven Danna <[email protected]>
2 parents c0b904a + 599a055 commit 5949c6b

File tree

2 files changed

+76
-63
lines changed

2 files changed

+76
-63
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,20 @@ var bufferedWritesScanTransformEnabled = settings.RegisterBoolSetting(
4848
settings.ApplicationLevel,
4949
"kv.transaction.write_buffering.transformations.scans.enabled",
5050
"if enabled, locking scans and reverse scans with replicated durability are transformed to unreplicated durability",
51-
true,
51+
metamorphic.ConstantWithTestBool("kv.transaction.write_buffering.transformations.scans.enabled", true /* defaultValue */),
5252
)
5353

54+
const defaultBufferSize = 1 << 22 // 4MB
5455
var bufferedWritesMaxBufferSize = settings.RegisterByteSizeSetting(
5556
settings.ApplicationLevel,
5657
"kv.transaction.write_buffering.max_buffer_size",
5758
"if non-zero, defines that maximum size of the "+
5859
"buffer that will be used to buffer transactional writes per-transaction",
59-
1<<22, // 4MB
60+
int64(metamorphic.ConstantWithTestRange("kv.transaction.write_buffering.max_buffer_size",
61+
defaultBufferSize, // default
62+
1, // min
63+
defaultBufferSize, // max
64+
)),
6065
settings.NonNegativeInt,
6166
settings.WithPublic,
6267
)
@@ -267,17 +272,18 @@ func (twb *txnWriteBuffer) SendLocked(
267272
return twb.flushBufferAndSendBatch(ctx, ba)
268273
}
269274

270-
if twb.batchRequiresFlush(ctx, ba) {
275+
// We check if scan transforms are enabled once and use that answer until the
276+
// end of SendLocked.
277+
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
278+
279+
if twb.batchRequiresFlush(ctx, ba, transformScans) {
271280
return twb.flushBufferAndSendBatch(ctx, ba)
272281
}
273282

274283
// Check if buffering writes from the supplied batch will run us over
275284
// budget. If it will, we shouldn't buffer writes from the current batch,
276285
// and flush the buffer.
277286
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
278-
// We check if scan transforms are enabled once and use that answer until the
279-
// end of SendLocked.
280-
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
281287
bufSize := twb.estimateSize(ba, transformScans) + twb.bufferSize
282288

283289
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
@@ -331,7 +337,9 @@ func (twb *txnWriteBuffer) SendLocked(
331337
return twb.mergeResponseWithRequestRecords(ctx, rr, br)
332338
}
333339

334-
func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool {
340+
func (twb *txnWriteBuffer) batchRequiresFlush(
341+
ctx context.Context, ba *kvpb.BatchRequest, transformScans bool,
342+
) bool {
335343
for _, ru := range ba.Requests {
336344
req := ru.GetInner()
337345
switch req.(type) {
@@ -477,7 +485,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
477485
estimate += scratch.size()
478486
estimate += lockKeyInfoSize
479487
case *kvpb.GetRequest:
480-
if t.KeyLockingDurability == lock.Replicated {
488+
if IsReplicatedLockingRequest(t) {
481489
scratch.key = t.Key
482490
estimate += scratch.size()
483491
estimate += lockKeyInfoSize
@@ -514,9 +522,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
514522
// the buffer. Here, we assume at least 1 key will be returned that is
515523
// about the size of the scan start boundary. We try to protect from large
516524
// buffer overflows by transforming the batch's MaxSpanRequestKeys later.
517-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
518-
shouldTransform = shouldTransform && transformScans
519-
if shouldTransform {
525+
if IsReplicatedLockingRequest(t) && transformScans {
520526
scratch.key = t.Key
521527
scratch.vals[0] = bufferedValue{
522528
seq: t.Sequence,
@@ -525,9 +531,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
525531
}
526532
case *kvpb.ReverseScanRequest:
527533
// See the comment on the ScanRequest case for more details.
528-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
529-
shouldTransform = shouldTransform && transformScans
530-
if shouldTransform {
534+
if IsReplicatedLockingRequest(t) && transformScans {
531535
scratch.key = t.Key
532536
scratch.vals[0] = bufferedValue{
533537
seq: t.Sequence,
@@ -932,7 +936,7 @@ func (twb *txnWriteBuffer) applyTransformations(
932936
_, lockStr, served := twb.maybeServeRead(t.Key, t.Sequence)
933937

934938
requiresAdditionalLocking := t.KeyLockingStrength > lockStr
935-
requiresLockTransform := t.KeyLockingStrength != lock.None && t.KeyLockingDurability == lock.Replicated
939+
requiresLockTransform := IsReplicatedLockingRequest(t)
936940
requestRequired := requiresAdditionalLocking || !served
937941

938942
if requestRequired && requiresLockTransform {
@@ -953,9 +957,7 @@ func (twb *txnWriteBuffer) applyTransformations(
953957
// Regardless of whether the scan overlaps with any writes in the buffer
954958
// or not, we must send the request to the KV layer. We can't know for
955959
// sure that there's nothing else to read.
956-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
957-
shouldTransform = shouldTransform && transformScans
958-
if shouldTransform {
960+
if IsReplicatedLockingRequest(t) && transformScans {
959961
var scanReqU kvpb.RequestUnion
960962
scanReq := t.ShallowCopy().(*kvpb.ScanRequest)
961963
scanReq.KeyLockingDurability = lock.Unreplicated
@@ -973,9 +975,7 @@ func (twb *txnWriteBuffer) applyTransformations(
973975
// Regardless of whether the reverse scan overlaps with any writes in the
974976
// buffer or not, we must send the request to the KV layer. We can't know
975977
// for sure that there's nothing else to read.
976-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
977-
shouldTransform = shouldTransform && transformScans
978-
if shouldTransform {
978+
if IsReplicatedLockingRequest(t) && transformScans {
979979
var rScanReqU kvpb.RequestUnion
980980
rScanReq := t.ShallowCopy().(*kvpb.ReverseScanRequest)
981981
rScanReq.KeyLockingDurability = lock.Unreplicated
@@ -1635,6 +1635,19 @@ func (twb *txnWriteBuffer) addToBuffer(
16351635
}
16361636
}
16371637

1638+
func IsReplicatedLockingRequest(req kvpb.Request) bool {
1639+
switch r := req.(type) {
1640+
case *kvpb.ScanRequest:
1641+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1642+
case *kvpb.ReverseScanRequest:
1643+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1644+
case *kvpb.GetRequest:
1645+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1646+
default:
1647+
return false
1648+
}
1649+
}
1650+
16381651
// addDurableLockedReadToBuffer adds a locking read to the given buffer.
16391652
//
16401653
// TODO(ssd): Determine if we need to track the kvnemesis sequence number for

0 commit comments

Comments
 (0)