Skip to content

Commit 142f984

Browse files
committed
kvcoord: reset HasBufferedAllPrecedingWrites for multi-batch flush
HasBufferedAllPrecedingWrites should only be set when the write buffer _knows_ that no writes have been issued. In the case of a multi-batch flush, the second batch should not have HasBufferedAllPrecedingWrites set to true since the first batch contained writes. Fixes #153509 Release note: None
1 parent 33c0b2e commit 142f984

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,6 +1826,9 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
18261826
if err := requireAllFlushedRequestsProcessed(br.Responses); err != nil {
18271827
return nil, kvpb.NewError(err)
18281828
}
1829+
1830+
// We've written intents now, so this should be false.
1831+
ba.HasBufferedAllPrecedingWrites = false
18291832
ba.UpdateTxn(br.Txn)
18301833

18311834
return twb.wrapped.SendLocked(ctx, ba)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2671,6 +2671,49 @@ func TestTxnWriteBufferHasBufferedAllPrecedingWrites(t *testing.T) {
26712671
}
26722672
}
26732673

2674+
func TestTxnWriteBufferHasBufferedAllPrecedingWritesSplitFlush(t *testing.T) {
2675+
ctx := context.Background()
2676+
twb, mockSender, _ := makeMockTxnWriteBuffer(ctx)
2677+
txn := makeTxnProto()
2678+
txn.Sequence = 1
2679+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
2680+
2681+
// This Put will be completely buffered.
2682+
ba := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
2683+
putA := putArgs(keyA, "valA", txn.Sequence)
2684+
ba.Add(putA)
2685+
2686+
br, pErr := twb.SendLocked(ctx, ba)
2687+
require.Nil(t, pErr)
2688+
require.NotNil(t, br)
2689+
2690+
// Send a DeleteRange with MaxSpanRequestKeys set which will force a flush
2691+
// using two batches. The first should have HasBufferedAllPrecedingWrites, the
2692+
// next should not.
2693+
reqCount := 0
2694+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
2695+
require.Equal(t, reqCount == 0, ba.HasBufferedAllPrecedingWrites,
2696+
"HasBufferedAllPrecedingWrites expected on the first request (and only the first request")
2697+
reqCount++
2698+
br = ba.CreateReply()
2699+
br.Txn = ba.Txn
2700+
return br, nil
2701+
})
2702+
2703+
txn.Sequence++
2704+
ba = &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
2705+
ba.MaxSpanRequestKeys = 10
2706+
ba.Add(delRangeArgs(keyB, keyC, txn.Sequence))
2707+
2708+
beforeCallCount := mockSender.NumCalled()
2709+
br, pErr = twb.SendLocked(ctx, ba)
2710+
require.Nil(t, pErr)
2711+
require.NotNil(t, br)
2712+
require.Equal(t, 2, mockSender.NumCalled()-beforeCallCount)
2713+
require.Len(t, br.Responses, 1)
2714+
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
2715+
}
2716+
26742717
// BenchmarkTxnWriteBuffer benchmarks the txnWriteBuffer. The test sets up a
26752718
// transaction with an existing buffer and runs a single batch through
26762719
// SendLocked and flushBufferAndSendBatch. The test varies the state of the

0 commit comments

Comments
 (0)