Skip to content

Commit fad25b4

Browse files
committed
kvclient: flush txnWriteBuffer if IncrementRequest is received
Previously we issued an error on IncrementRequest assuming this was only used in (1) transactions that have already disabled the write buffer because of DDL statements or (2) non-transactional requests. However, it is also used by any test that sets UseTransactionalDescIDGenerator to true. While we could call txn.SetBufferedWritesEnabled from the transactional ID generator itself, I opted to simply flush the buffer if we encounter an IncrementRequest so that IncrementRequest does not have a hidden requirement of disabling write buffering. Epic: none Release note: None
1 parent 70bb4fa commit fad25b4

File tree

2 files changed

+190
-129
lines changed

2 files changed

+190
-129
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -217,31 +217,7 @@ func (twb *txnWriteBuffer) SendLocked(
217217
return twb.flushBufferAndSendBatch(ctx, ba)
218218
}
219219

220-
if _, ok := ba.GetArg(kvpb.DeleteRange); ok {
221-
log.VEventf(ctx, 2, "DeleteRangeRequest forcing flush of write buffer")
222-
// DeleteRange requests can delete an arbitrary number of keys over a
223-
// given keyspan. We won't know the exact scope of the delete until
224-
// we've scanned the keyspan, which must happen on the server. We've got
225-
// a couple of options here:
226-
// 1. We decompose the DeleteRange request into a (potentially locking)
227-
// Scan followed by buffered point Deletes for each key in the scan's
228-
// result.
229-
// 2. We flush the buffer[1] and send the DeleteRange request to the KV
230-
// layer.
231-
//
232-
// We choose option 2, as typically the number of keys deleted is large,
233-
// and we may realize we're over budget after performing the initial
234-
// scan of the keyspan. At that point, we'll have to flush the buffer
235-
// anyway. Moreover, buffered writes are most impactful when a
236-
// transaction is writing to a small number of keys. As such, it's fine
237-
// to not optimize the DeleteRange case, as typically it results in a
238-
// large writing transaction.
239-
//
240-
// [1] Technically, we only need to flush the overlapping portion of the
241-
// buffer. However, for simplicity, the txnWriteBuffer doesn't support
242-
// transactions with partially buffered writes and partially flushed
243-
// writes. We could change this in the future if there's benefit to
244-
// doing so.
220+
if twb.batchRequiresFlush(ctx, ba) {
245221
return twb.flushBufferAndSendBatch(ctx, ba)
246222
}
247223

@@ -295,6 +271,55 @@ func (twb *txnWriteBuffer) SendLocked(
295271
return twb.mergeResponseWithTransformations(ctx, ts, br)
296272
}
297273

274+
func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool {
275+
for _, ru := range ba.Requests {
276+
req := ru.GetInner()
277+
switch req.(type) {
278+
case *kvpb.IncrementRequest:
279+
// We don't typically see IncrementRequest in transactional batches that
280+
// haven't already had write buffering disabled becuase of DDL statements.
281+
//
282+
// However, we do have at least a few users of the NewTransactionalGenerator
283+
// in test code and builtins.
284+
//
285+
// We could handle this similar to how we handle ConditionalPut, but its
286+
// not clear there is much value in that.
287+
log.VEventf(ctx, 2, "%s forcing flush of write buffer", req.Method())
288+
return true
289+
case *kvpb.DeleteRangeRequest:
290+
// DeleteRange requests can delete an arbitrary number of keys over a
291+
// given keyspan. We won't know the exact scope of the delete until
292+
// we've scanned the keyspan, which must happen on the server. We've got
293+
// a couple of options here:
294+
//
295+
// 1. We decompose the DeleteRange request into a (potentially
296+
// locking) Scan followed by buffered point Deletes for each
297+
// key in the scan's result.
298+
//
299+
// 2. We flush the buffer[1] and send the DeleteRange request to
300+
// the KV layer.
301+
//
302+
// We choose option 2, as typically the number of keys deleted is large,
303+
// and we may realize we're over budget after performing the initial
304+
// scan of the keyspan. At that point, we'll have to flush the buffer
305+
// anyway. Moreover, buffered writes are most impactful when a
306+
// transaction is writing to a small number of keys. As such, it's fine
307+
// to not optimize the DeleteRange case, as typically it results in a
308+
// large writing transaction.
309+
//
310+
// [1] Technically, we only need to flush the overlapping portion of the
311+
// buffer. However, for simplicity, the txnWriteBuffer doesn't support
312+
// transactions with partially buffered writes and partially flushed
313+
// writes. We could change this in the future if there's benefit to
314+
// doing so.
315+
316+
log.VEventf(ctx, 2, "%s forcing flush of write buffer", req.Method())
317+
return true
318+
}
319+
}
320+
return false
321+
}
322+
298323
// validateBatch returns an error if the batch is unsupported
299324
// by the txnWriteBuffer.
300325
func (twb *txnWriteBuffer) validateBatch(ba *kvpb.BatchRequest) error {
@@ -353,9 +378,9 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
353378
}
354379
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
355380
default:
356-
// All other requests are unsupported. Note that we assume EndTxn and
357-
// DeleteRange requests were handled explicitly before this method was
358-
// called.
381+
// All other requests are unsupported. Note that we assume that requests
382+
// that should result in a buffer flush are handled explicitly before this
383+
// method was called.
359384
return unsupportedMethodError(t.Method())
360385
}
361386
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 137 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,118 +1508,164 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
15081508
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
15091509
}
15101510

1511-
// TestTxnWriteBufferDeleteRange ensures that the txnWriteBuffer correctly
1512-
// handles DeleteRange requests. In particular, whenever we see a batch with a
1513-
// DeleteRange request, the write buffer is flushed and write buffering is
1514-
// turned off for subsequent requests.
1515-
func TestTxnWriteBufferDeleteRange(t *testing.T) {
1511+
// TestTxnWriteBufferFlushesIfBatchRequiresFlushing ensures that the
1512+
// txnWriteBuffer correctly handles requests that aren't currently
1513+
// supported by the txnWriteBuffer by flushing the buffer before
1514+
// processing the request.
1515+
func TestTxnWriteBufferFlushesIfBatchRequiresFlushing(t *testing.T) {
15161516
defer leaktest.AfterTest(t)()
15171517
defer log.Scope(t).Close(t)
15181518
ctx := context.Background()
1519-
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1520-
1521-
txn := makeTxnProto()
1522-
txn.Sequence = 10
15231519
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
15241520
valA := "valA"
15251521

1526-
ba := &kvpb.BatchRequest{}
1527-
ba.Header = kvpb.Header{Txn: &txn}
1528-
putA := putArgs(keyA, valA, txn.Sequence)
1529-
delC := delArgs(keyC, txn.Sequence)
1530-
ba.Add(putA)
1531-
ba.Add(delC)
1532-
1533-
numCalled := mockSender.NumCalled()
1534-
br, pErr := twb.SendLocked(ctx, ba)
1535-
require.Nil(t, pErr)
1536-
require.NotNil(t, br)
1537-
// All the requests should be buffered and not make it past the
1538-
// txnWriteBuffer. The response returned should be indistinguishable.
1539-
require.Equal(t, numCalled, mockSender.NumCalled())
1540-
require.Len(t, br.Responses, 2)
1541-
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1542-
// Verify the Put was buffered correctly.
1543-
expBufferedWrites := []bufferedWrite{
1544-
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1545-
makeBufferedWrite(keyC, makeBufferedValue("", 10)),
1522+
type batchSendMock func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error)
1523+
type testCase struct {
1524+
name string
1525+
ba func(*kvpb.BatchRequest)
1526+
baSender func(*testing.T) batchSendMock
1527+
validateResp func(*testing.T, *kvpb.BatchResponse, *kvpb.Error)
15461528
}
1547-
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1548-
1549-
// Send a DeleteRange request. This should result in the entire buffer
1550-
// being flushed. Note that we're flushing the delete to key C as well, even
1551-
// though it doesn't overlap with the DeleteRange request.
1552-
ba = &kvpb.BatchRequest{}
1553-
ba.Header = kvpb.Header{Txn: &txn}
1554-
delRange := delRangeArgs(keyA, keyB, txn.Sequence)
1555-
ba.Add(delRange)
15561529

1557-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1558-
require.Len(t, ba.Requests, 3)
1530+
testCases := []testCase{
1531+
{
1532+
name: "DeleteRange",
1533+
ba: func(b *kvpb.BatchRequest) {
1534+
b.Add(delRangeArgs(keyA, keyB, b.Txn.Sequence))
1535+
},
1536+
baSender: func(t *testing.T) batchSendMock {
1537+
return func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1538+
require.Len(t, ba.Requests, 3)
1539+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1540+
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1541+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[2].GetInner())
1542+
1543+
br := ba.CreateReply()
1544+
br.Txn = ba.Txn
1545+
return br, nil
1546+
}
1547+
},
1548+
validateResp: func(t *testing.T, br *kvpb.BatchResponse, pErr *kvpb.Error) {
1549+
require.Nil(t, pErr)
1550+
require.NotNil(t, br)
1551+
require.Len(t, br.Responses, 1)
1552+
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1553+
},
1554+
},
1555+
{
1556+
name: "Increment",
1557+
ba: func(b *kvpb.BatchRequest) {
1558+
b.Add(&kvpb.IncrementRequest{
1559+
RequestHeader: kvpb.RequestHeader{Key: keyA},
1560+
Increment: 1,
1561+
})
1562+
},
1563+
baSender: func(t *testing.T) batchSendMock {
1564+
return func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1565+
require.Len(t, ba.Requests, 3)
1566+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1567+
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1568+
require.IsType(t, &kvpb.IncrementRequest{}, ba.Requests[2].GetInner())
1569+
1570+
br := ba.CreateReply()
1571+
br.Txn = ba.Txn
1572+
return br, nil
1573+
}
1574+
},
1575+
validateResp: func(t *testing.T, br *kvpb.BatchResponse, pErr *kvpb.Error) {
1576+
require.Nil(t, pErr)
1577+
require.NotNil(t, br)
1578+
require.Len(t, br.Responses, 1)
1579+
require.IsType(t, &kvpb.IncrementResponse{}, br.Responses[0].GetInner())
1580+
},
1581+
},
1582+
}
15591583

1560-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1561-
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1562-
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[2].GetInner())
1584+
for _, tc := range testCases {
1585+
t.Run(tc.name, func(t *testing.T) {
1586+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
15631587

1564-
br = ba.CreateReply()
1565-
br.Txn = ba.Txn
1566-
return br, nil
1567-
})
1588+
txn := makeTxnProto()
1589+
txn.Sequence = 10
1590+
ba := &kvpb.BatchRequest{}
1591+
ba.Header = kvpb.Header{Txn: &txn}
1592+
putA := putArgs(keyA, valA, txn.Sequence)
1593+
delC := delArgs(keyC, txn.Sequence)
1594+
ba.Add(putA)
1595+
ba.Add(delC)
15681596

1569-
br, pErr = twb.SendLocked(ctx, ba)
1570-
require.Nil(t, pErr)
1571-
require.NotNil(t, br)
1572-
// Even though we flushed some writes, it shouldn't make it back to the response.
1573-
require.Len(t, br.Responses, 1)
1574-
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1575-
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1597+
numCalled := mockSender.NumCalled()
1598+
br, pErr := twb.SendLocked(ctx, ba)
1599+
require.Nil(t, pErr)
1600+
require.NotNil(t, br)
1601+
// All the requests should be buffered and not make it past the
1602+
// txnWriteBuffer. The response returned should be indistinguishable.
1603+
require.Equal(t, numCalled, mockSender.NumCalled())
1604+
require.Len(t, br.Responses, 2)
1605+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1606+
// Verify the Put was buffered correctly.
1607+
expBufferedWrites := []bufferedWrite{
1608+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1609+
makeBufferedWrite(keyC, makeBufferedValue("", 10)),
1610+
}
1611+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
15761612

1577-
// Ensure the buffer is empty at this point.
1578-
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1613+
// Send the batch that should require a flush
1614+
ba = &kvpb.BatchRequest{}
1615+
ba.Header = kvpb.Header{Txn: &txn}
1616+
tc.ba(ba)
1617+
mockSender.MockSend(tc.baSender(t))
1618+
br, pErr = twb.SendLocked(ctx, ba)
1619+
tc.validateResp(t, br, pErr)
1620+
// Ensure the buffer is empty at this point.
1621+
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1622+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
15791623

1580-
// Subsequent batches should not be buffered.
1581-
ba = &kvpb.BatchRequest{}
1582-
ba.Header = kvpb.Header{Txn: &txn}
1583-
putC := putArgs(keyC, valA, txn.Sequence)
1584-
ba.Add(putC)
1624+
// Subsequent batches should not be buffered.
1625+
ba = &kvpb.BatchRequest{}
1626+
ba.Header = kvpb.Header{Txn: &txn}
1627+
putC := putArgs(keyC, valA, txn.Sequence)
1628+
ba.Add(putC)
15851629

1586-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1587-
require.Len(t, ba.Requests, 1)
1630+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1631+
require.Len(t, ba.Requests, 1)
15881632

1589-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1633+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
15901634

1591-
br = ba.CreateReply()
1592-
br.Txn = ba.Txn
1593-
return br, nil
1594-
})
1635+
br = ba.CreateReply()
1636+
br.Txn = ba.Txn
1637+
return br, nil
1638+
})
15951639

1596-
br, pErr = twb.SendLocked(ctx, ba)
1597-
require.Nil(t, pErr)
1598-
require.NotNil(t, br)
1599-
require.Len(t, br.Responses, 1)
1600-
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1640+
br, pErr = twb.SendLocked(ctx, ba)
1641+
require.Nil(t, pErr)
1642+
require.NotNil(t, br)
1643+
require.Len(t, br.Responses, 1)
1644+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
16011645

1602-
// Commit the transaction. We flushed the buffer already, and no subsequent
1603-
// writes were buffered, so the buffer should be empty. As such, no write
1604-
// requests should be added to the batch.
1605-
ba = &kvpb.BatchRequest{}
1606-
ba.Header = kvpb.Header{Txn: &txn}
1607-
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1646+
// Commit the transaction. We flushed the buffer already, and no subsequent
1647+
// writes were buffered, so the buffer should be empty. As such, no write
1648+
// requests should be added to the batch.
1649+
ba = &kvpb.BatchRequest{}
1650+
ba.Header = kvpb.Header{Txn: &txn}
1651+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
16081652

1609-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1610-
require.Len(t, ba.Requests, 1)
1611-
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())
1653+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1654+
require.Len(t, ba.Requests, 1)
1655+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())
16121656

1613-
br = ba.CreateReply()
1614-
br.Txn = ba.Txn
1615-
return br, nil
1616-
})
1657+
br = ba.CreateReply()
1658+
br.Txn = ba.Txn
1659+
return br, nil
1660+
})
16171661

1618-
br, pErr = twb.SendLocked(ctx, ba)
1619-
require.Nil(t, pErr)
1620-
require.NotNil(t, br)
1621-
require.Len(t, br.Responses, 1)
1622-
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1662+
br, pErr = twb.SendLocked(ctx, ba)
1663+
require.Nil(t, pErr)
1664+
require.NotNil(t, br)
1665+
require.Len(t, br.Responses, 1)
1666+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1667+
})
1668+
}
16231669
}
16241670

16251671
// TestTxnWriteBufferRollbackToSavepoint tests the savepoint rollback logic.
@@ -1915,16 +1961,6 @@ func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
19151961
return b
19161962
},
19171963
},
1918-
{
1919-
name: "batch with Increment",
1920-
ba: func() *kvpb.BatchRequest {
1921-
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1922-
b.Add(&kvpb.IncrementRequest{
1923-
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1924-
})
1925-
return b
1926-
},
1927-
},
19281964
{
19291965
name: "batch with ReturnRawMVCCValues Scan",
19301966
ba: func() *kvpb.BatchRequest {

0 commit comments

Comments
 (0)