Skip to content

Commit de90bde

Browse files
committed
kvcoord: minor txnWriteBuffer refactor
This makes some future changes we plan to make a bit easier. Release note: None Epic: none
1 parent c0b904a commit de90bde

File tree

1 file changed

+27
-19
lines changed

1 file changed

+27
-19
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,18 @@ func (twb *txnWriteBuffer) SendLocked(
267267
return twb.flushBufferAndSendBatch(ctx, ba)
268268
}
269269

270-
if twb.batchRequiresFlush(ctx, ba) {
270+
// We check if scan transforms are enabled once and use that answer until the
271+
// end of SendLocked.
272+
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
273+
274+
if twb.batchRequiresFlush(ctx, ba, transformScans) {
271275
return twb.flushBufferAndSendBatch(ctx, ba)
272276
}
273277

274278
// Check if buffering writes from the supplied batch will run us over
275279
// budget. If it will, we shouldn't buffer writes from the current batch,
276280
// and flush the buffer.
277281
maxSize := bufferedWritesMaxBufferSize.Get(&twb.st.SV)
278-
// We check if scan transforms are enabled once and use that answer until the
279-
// end of SendLocked.
280-
transformScans := bufferedWritesScanTransformEnabled.Get(&twb.st.SV)
281282
bufSize := twb.estimateSize(ba, transformScans) + twb.bufferSize
282283

283284
// NB: if bufferedWritesMaxBufferSize is set to 0 then we effectively disable
@@ -331,7 +332,9 @@ func (twb *txnWriteBuffer) SendLocked(
331332
return twb.mergeResponseWithRequestRecords(ctx, rr, br)
332333
}
333334

334-
func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool {
335+
func (twb *txnWriteBuffer) batchRequiresFlush(
336+
ctx context.Context, ba *kvpb.BatchRequest, transformScans bool,
337+
) bool {
335338
for _, ru := range ba.Requests {
336339
req := ru.GetInner()
337340
switch req.(type) {
@@ -477,7 +480,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
477480
estimate += scratch.size()
478481
estimate += lockKeyInfoSize
479482
case *kvpb.GetRequest:
480-
if t.KeyLockingDurability == lock.Replicated {
483+
if IsReplicatedLockingRequest(t) {
481484
scratch.key = t.Key
482485
estimate += scratch.size()
483486
estimate += lockKeyInfoSize
@@ -514,9 +517,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
514517
// the buffer. Here, we assume at least 1 key will be returned that is
515518
// about the size of the scan start boundary. We try to protect from large
516519
// buffer overflows by transforming the batch's MaxSpanRequestKeys later.
517-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
518-
shouldTransform = shouldTransform && transformScans
519-
if shouldTransform {
520+
if IsReplicatedLockingRequest(t) && transformScans {
520521
scratch.key = t.Key
521522
scratch.vals[0] = bufferedValue{
522523
seq: t.Sequence,
@@ -525,9 +526,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest, transformScans bo
525526
}
526527
case *kvpb.ReverseScanRequest:
527528
// See the comment on the ScanRequest case for more details.
528-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
529-
shouldTransform = shouldTransform && transformScans
530-
if shouldTransform {
529+
if IsReplicatedLockingRequest(t) && transformScans {
531530
scratch.key = t.Key
532531
scratch.vals[0] = bufferedValue{
533532
seq: t.Sequence,
@@ -932,7 +931,7 @@ func (twb *txnWriteBuffer) applyTransformations(
932931
_, lockStr, served := twb.maybeServeRead(t.Key, t.Sequence)
933932

934933
requiresAdditionalLocking := t.KeyLockingStrength > lockStr
935-
requiresLockTransform := t.KeyLockingStrength != lock.None && t.KeyLockingDurability == lock.Replicated
934+
requiresLockTransform := IsReplicatedLockingRequest(t)
936935
requestRequired := requiresAdditionalLocking || !served
937936

938937
if requestRequired && requiresLockTransform {
@@ -953,9 +952,7 @@ func (twb *txnWriteBuffer) applyTransformations(
953952
// Regardless of whether the scan overlaps with any writes in the buffer
954953
// or not, we must send the request to the KV layer. We can't know for
955954
// sure that there's nothing else to read.
956-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
957-
shouldTransform = shouldTransform && transformScans
958-
if shouldTransform {
955+
if IsReplicatedLockingRequest(t) && transformScans {
959956
var scanReqU kvpb.RequestUnion
960957
scanReq := t.ShallowCopy().(*kvpb.ScanRequest)
961958
scanReq.KeyLockingDurability = lock.Unreplicated
@@ -973,9 +970,7 @@ func (twb *txnWriteBuffer) applyTransformations(
973970
// Regardless of whether the reverse scan overlaps with any writes in the
974971
// buffer or not, we must send the request to the KV layer. We can't know
975972
// for sure that there's nothing else to read.
976-
shouldTransform := t.KeyLockingStrength > lock.None && t.KeyLockingDurability == lock.Replicated
977-
shouldTransform = shouldTransform && transformScans
978-
if shouldTransform {
973+
if IsReplicatedLockingRequest(t) && transformScans {
979974
var rScanReqU kvpb.RequestUnion
980975
rScanReq := t.ShallowCopy().(*kvpb.ReverseScanRequest)
981976
rScanReq.KeyLockingDurability = lock.Unreplicated
@@ -1635,6 +1630,19 @@ func (twb *txnWriteBuffer) addToBuffer(
16351630
}
16361631
}
16371632

1633+
func IsReplicatedLockingRequest(req kvpb.Request) bool {
1634+
switch r := req.(type) {
1635+
case *kvpb.ScanRequest:
1636+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1637+
case *kvpb.ReverseScanRequest:
1638+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1639+
case *kvpb.GetRequest:
1640+
return r.KeyLockingStrength > lock.None && r.KeyLockingDurability == lock.Replicated
1641+
default:
1642+
return false
1643+
}
1644+
}
1645+
16381646
// addDurableLockedReadToBuffer adds a locking read to the given buffer.
16391647
//
16401648
// TODO(ssd): Determine if we need to track the kvnemesis sequence number for

0 commit comments

Comments
 (0)