Skip to content

Commit d59016c

Browse files
craig[bot]stevendanna
andcommitted
Merge #143304
143304: kvclient: fix small bug in txnWriteBuffer r=yuzefovich a=stevendanna Previously, we were not resetting the endKey before starting our iteration. This could result in the iterator ending before reaching our desired key despite it being in the buffer. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents e5dd167 + 7c03dcf commit d59016c

File tree

2 files changed

+72
-13
lines changed

2 files changed

+72
-13
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,15 @@ func (twb *txnWriteBuffer) applyTransformations(
503503
return baRemote, ts
504504
}
505505

506+
// seekItemForSpan returns a bufferedWrite appropriate for use with a
507+
// write-buffer iterator. Point lookups should use a nil end key.
508+
func (twb *txnWriteBuffer) seekItemForSpan(key, endKey roachpb.Key) *bufferedWrite {
509+
seek := &twb.bufferSeek
510+
seek.key = key
511+
seek.endKey = endKey
512+
return seek
513+
}
514+
506515
// maybeServeRead serves the supplied read request from the buffer if a write or
507516
// deletion tombstone on the key is present in the buffer. Additionally, a
508517
// boolean indicating whether the read request was served or not is also
@@ -511,9 +520,7 @@ func (twb *txnWriteBuffer) maybeServeRead(
511520
key roachpb.Key, seq enginepb.TxnSeq,
512521
) (*roachpb.Value, bool) {
513522
it := twb.buffer.MakeIter()
514-
seek := &twb.bufferSeek
515-
seek.key = key
516-
523+
seek := twb.seekItemForSpan(key, nil)
517524
it.FirstOverlap(seek)
518525
if it.Valid() {
519526
bufferedVals := it.Cur().vals
@@ -543,11 +550,7 @@ func (twb *txnWriteBuffer) maybeServeRead(
543550
// write.
544551
func (twb *txnWriteBuffer) scanOverlaps(key roachpb.Key, endKey roachpb.Key) bool {
545552
it := twb.buffer.MakeIter()
546-
seek := &twb.bufferSeek
547-
seek.key = key
548-
seek.endKey = endKey
549-
550-
it.FirstOverlap(seek)
553+
it.FirstOverlap(twb.seekItemForSpan(key, endKey))
551554
return it.Valid()
552555
}
553556

@@ -617,9 +620,7 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
617620
reverse bool,
618621
) {
619622
it := twb.buffer.MakeIter()
620-
seek := &twb.bufferSeek
621-
seek.key = respIter.startKey()
622-
seek.endKey = respIter.endKey()
623+
seek := twb.seekItemForSpan(respIter.startKey(), respIter.endKey())
623624

624625
if reverse {
625626
it.LastOverlap(seek)
@@ -871,8 +872,7 @@ func (t transformations) Empty() bool {
871872
// addToBuffer adds a write to the given key to the buffer.
872873
func (twb *txnWriteBuffer) addToBuffer(key roachpb.Key, val roachpb.Value, seq enginepb.TxnSeq) {
873874
it := twb.buffer.MakeIter()
874-
seek := &twb.bufferSeek
875-
seek.key = key
875+
seek := twb.seekItemForSpan(key, nil)
876876

877877
it.FirstOverlap(seek)
878878
if it.Valid() {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,65 @@ func TestTxnWriteBufferServesPointReadsLocally(t *testing.T) {
671671
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
672672
}
673673

674+
// TestTxnWriteBufferServesPointReadsAfterScan is a regression test
675+
// for a bug in which reused iterator state resulted in the end key of
676+
// a scan affecting subsequent GetRequests.
677+
func TestTxnWriteBufferServesPointReadsAfterScan(t *testing.T) {
678+
defer leaktest.AfterTest(t)()
679+
defer log.Scope(t).Close(t)
680+
ctx := context.Background()
681+
twb, mockSender := makeMockTxnWriteBuffer()
682+
683+
txn := makeTxnProto()
684+
txn.Sequence = 10
685+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
686+
687+
ba := &kvpb.BatchRequest{}
688+
ba.Header = kvpb.Header{Txn: &txn}
689+
ba.Add(putArgs(keyA, "valA", txn.Sequence))
690+
ba.Add(putArgs(keyB, "valB", txn.Sequence))
691+
ba.Add(putArgs(keyC, "valC", txn.Sequence))
692+
numCalled := mockSender.NumCalled()
693+
br, pErr := twb.SendLocked(ctx, ba)
694+
require.Nil(t, pErr)
695+
require.NotNil(t, br)
696+
// All writes should be buffered.
697+
require.Equal(t, numCalled, mockSender.NumCalled())
698+
699+
// First, read [a, c) via ScanRequest.
700+
txn.Sequence = 10
701+
ba = &kvpb.BatchRequest{}
702+
ba.Header = kvpb.Header{Txn: &txn}
703+
ba.Add(&kvpb.ScanRequest{
704+
RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence},
705+
})
706+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
707+
require.Len(t, ba.Requests, 1)
708+
require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner())
709+
br = ba.CreateReply()
710+
br.Txn = ba.Txn
711+
return br, nil
712+
})
713+
br, pErr = twb.SendLocked(ctx, ba)
714+
require.Nil(t, pErr)
715+
require.NotNil(t, br)
716+
require.Len(t, br.Responses, 1)
717+
require.Equal(t, int64(2), br.Responses[0].GetScan().NumKeys)
718+
719+
// Perform a read on keyC.
720+
ba = &kvpb.BatchRequest{}
721+
getC := &kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, Sequence: txn.Sequence}}
722+
ba.Add(getC)
723+
724+
numCalled = mockSender.NumCalled()
725+
br, pErr = twb.SendLocked(ctx, ba)
726+
require.Nil(t, pErr)
727+
require.NotNil(t, br)
728+
require.Len(t, br.Responses, 1)
729+
require.True(t, br.Responses[0].GetGet().Value.IsPresent())
730+
require.Equal(t, mockSender.NumCalled(), numCalled)
731+
}
732+
674733
// TestTxnWriteBufferServesOverlappingReadsCorrectly ensures that Scan and
675734
// ReverseScan requests that overlap with buffered writes are correctly served
676735
// from the buffer.

0 commit comments

Comments
 (0)