Skip to content

Commit c933e77

Browse files
committed
kvcoord: add lockedKeyInfo overhead to size estimate in txnWriteBuffer
We added this to the buffer but didn't add it to the size estimates so we might have been underestimating sizes. Epic: none Release note: None
1 parent eae6695 commit c933e77

File tree

2 files changed

+51
-15
lines changed

2 files changed

+51
-15
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
443443
seq: t.Sequence,
444444
}
445445
estimate += scratch.size()
446+
estimate += lockKeyInfoSize
446447
case *kvpb.PutRequest:
447448
// NB: when estimating, we're being conservative by assuming the Put is to
448449
// a key that isn't already present in the buffer. If it were, we could
@@ -453,6 +454,9 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
453454
seq: t.Sequence,
454455
}
455456
estimate += scratch.size()
457+
if t.MustAcquireExclusiveLock {
458+
estimate += lockKeyInfoSize
459+
}
456460
case *kvpb.DeleteRequest:
457461
// NB: Similar to Put, we're assuming we're deleting a key that isn't
458462
// already present in the buffer.
@@ -461,6 +465,9 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
461465
seq: t.Sequence,
462466
}
463467
estimate += scratch.size()
468+
if t.MustAcquireExclusiveLock {
469+
estimate += lockKeyInfoSize
470+
}
464471
}
465472
// No other request is buffered.
466473
}
@@ -648,7 +655,10 @@ func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s save
648655
toDelete := make([]*bufferedWrite, 0)
649656
it := twb.buffer.MakeIter()
650657
for it.First(); it.Valid(); it.Next() {
651-
it.Cur().rollbackLockInfo(s.seqNum)
658+
if held := it.Cur().rollbackLockInfo(s.seqNum); !held {
659+
// If we aren't still held, update our buffer size.
660+
twb.bufferSize -= lockKeyInfoSize
661+
}
652662
bufferedVals := it.Cur().vals
653663
// NB: the savepoint is being rolled back to s.seqNum (inclusive). So,
654664
// idx is the index of the first value that is considered rolled back.
@@ -1346,7 +1356,9 @@ func (twb *txnWriteBuffer) addToBuffer(
13461356
val := bufferedValue{val: val, seq: seq, kvNemesisSeq: kvNemSeq}
13471357
bw.vals = append(bw.vals, val)
13481358
if lockInfo != nil {
1349-
bw.acquireLock(lockInfo)
1359+
if firstAcquisition := bw.acquireLock(lockInfo); firstAcquisition {
1360+
twb.bufferSize += lockKeyInfoSize
1361+
}
13501362
}
13511363
twb.bufferSize += val.size()
13521364
} else {
@@ -1511,14 +1523,21 @@ func (bw *bufferedWrite) size() int64 {
15111523
for _, v := range bw.vals {
15121524
size += v.size()
15131525
}
1526+
if bw.lki != nil {
1527+
size += lockKeyInfoSize
1528+
}
15141529
return size
15151530
}
15161531

1517-
func (bw *bufferedWrite) acquireLock(li *lockAcquisition) {
1532+
// acquireLock updates the lock information for this buffered write. It returns
1533+
// true if this is the first lock acquisition.
1534+
func (bw *bufferedWrite) acquireLock(li *lockAcquisition) bool {
15181535
if bw.lki == nil {
15191536
bw.lki = newLockedKeyInfo(li.str, li.seq, li.ts)
1537+
return true
15201538
} else {
15211539
bw.lki.acquireLock(li.str, li.seq, li.ts)
1540+
return false
15221541
}
15231542
}
15241543

@@ -1539,14 +1558,18 @@ func (bw *bufferedWrite) heldStr(seq enginepb.TxnSeq) lock.Strength {
15391558
return bw.lki.heldStr(seq)
15401559
}
15411560

1542-
func (bw *bufferedWrite) rollbackLockInfo(seq enginepb.TxnSeq) {
1561+
// rollbackLockInfo updates the lock information based on the rollback. It
1562+
// returns true if the lock is still held.
1563+
func (bw *bufferedWrite) rollbackLockInfo(seq enginepb.TxnSeq) bool {
15431564
if bw.lki == nil {
1544-
return
1565+
return false
15451566
}
15461567

1547-
if !bw.lki.rollbackSequence(seq) {
1568+
stillHeld := bw.lki.rollbackSequence(seq)
1569+
if !stillHeld {
15481570
bw.lki = nil
15491571
}
1572+
return stillHeld
15501573
}
15511574

15521575
const bufferedValueStructOverhead = int64(unsafe.Sizeof(bufferedValue{}))
@@ -1685,6 +1708,8 @@ type lockedKeyInfo struct {
16851708
ts hlc.Timestamp
16861709
}
16871710

1711+
var lockKeyInfoSize = int64(unsafe.Sizeof(lockedKeyInfo{}))
1712+
16881713
func newLockedKeyInfo(str lock.Strength, seqNum enginepb.TxnSeq, ts hlc.Timestamp) *lockedKeyInfo {
16891714
lki := &lockedKeyInfo{
16901715
ts: ts,

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,17 +1406,23 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14061406
putA := putArgs(keyA, valAStr, txn.Sequence)
14071407
ba.Add(putA)
14081408

1409-
require.Equal(t,
1410-
int64(len(keyA)+len(valA.RawBytes))+bufferedWriteStructOverhead+bufferedValueStructOverhead,
1411-
twb.estimateSize(ba),
1412-
)
1409+
expectedUnlockedPutSize := int64(len(keyA)+len(valA.RawBytes)) + bufferedWriteStructOverhead + bufferedValueStructOverhead
1410+
require.Equal(t, expectedUnlockedPutSize, twb.estimateSize(ba))
1411+
1412+
ba = &kvpb.BatchRequest{}
1413+
ba.Header = kvpb.Header{Txn: &txn}
1414+
putA = putArgs(keyA, valAStr, txn.Sequence)
1415+
putA.MustAcquireExclusiveLock = true
1416+
ba.Add(putA)
1417+
1418+
require.Equal(t, expectedUnlockedPutSize+lockKeyInfoSize, twb.estimateSize(ba))
14131419

14141420
ba = &kvpb.BatchRequest{}
14151421
cputLarge := cputArgs(keyLarge, valLargeStr, "", txn.Sequence)
14161422
ba.Add(cputLarge)
14171423

14181424
require.Equal(t,
1419-
int64(len(keyLarge)+len(valLarge.RawBytes))+bufferedWriteStructOverhead+bufferedValueStructOverhead,
1425+
int64(len(keyLarge)+len(valLarge.RawBytes))+bufferedWriteStructOverhead+bufferedValueStructOverhead+lockKeyInfoSize,
14201426
twb.estimateSize(ba),
14211427
)
14221428

@@ -1426,10 +1432,15 @@ func TestTxnWriteBufferEstimateSize(t *testing.T) {
14261432

14271433
// NB: note that we're overcounting here, as we're deleting a key that's
14281434
// already present in the buffer. But that's what estimating is about.
1429-
require.Equal(t,
1430-
int64(len(keyA))+bufferedWriteStructOverhead+bufferedValueStructOverhead,
1431-
twb.estimateSize(ba),
1432-
)
1435+
expectedUnlockedDelSize := int64(len(keyA)) + bufferedWriteStructOverhead + bufferedValueStructOverhead
1436+
require.Equal(t, expectedUnlockedDelSize, twb.estimateSize(ba))
1437+
1438+
ba = &kvpb.BatchRequest{}
1439+
delA = delArgs(keyA, txn.Sequence)
1440+
delA.MustAcquireExclusiveLock = true
1441+
ba.Add(delA)
1442+
1443+
require.Equal(t, expectedUnlockedDelSize+lockKeyInfoSize, twb.estimateSize(ba))
14331444
}
14341445

14351446
// TestTxnWriteBufferFlushesWhenOverBudget verifies that the txnWriteBuffer

0 commit comments

Comments
 (0)