Skip to content

Commit 3a6c753

Browse files
craig[bot]arulajmani
andcommitted
Merge #143763
143763: kvclient: correctly handle DeleteRange requests in the txnWriteBuffer r=arulajmani a=arulajmani DeleteRange requests can delete an arbitrary number of keys over a keyspan. We can't know the number or size of these until we perform a scan, which must happen on the server. As a result, we've got two options: 1. We could decompose the DeleteRange request into a (possibly locking) scan, followed by buffered point Deletes from the returned result. 2. We could flush the buffer and send the DeleteRange request to KV. We choose the latter, for its simplicity. DeleteRange requests typically delete a large number of keys (which could cause us to go over budget) and are only used by SQL in very specific circumstances (typically, for tables without secondary indexes). For now, we choose simplicity even though it pesimizes the case where a DeleteRange request only affects a small number of keys. We could revisit this in the future. Closes #143423 Release note: None Co-authored-by: Arul Ajmani <[email protected]>
2 parents a5d7c25 + 66fd5f5 commit 3a6c753

File tree

4 files changed

+176
-1
lines changed

4 files changed

+176
-1
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,7 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10881088
foundReverse = true
10891089

10901090
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1091-
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest:
1091+
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
10921092
// Accepted point requests that can be in batches with limit.
10931093

10941094
default:

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,33 @@ func (twb *txnWriteBuffer) SendLocked(
181181
return twb.flushBufferAndSendBatch(ctx, ba)
182182
}
183183

184+
if _, ok := ba.GetArg(kvpb.DeleteRange); ok {
185+
// DeleteRange requests can delete an arbitrary number of keys over a
186+
// given keyspan. We won't know the exact scope of the delete until
187+
// we've scanned the keyspan, which must happen on the server. We've got
188+
// a couple of options here:
189+
// 1. We decompose the DeleteRange request into a (potentially locking)
190+
// Scan followed by buffered point Deletes for each key in the scan's
191+
// result.
192+
// 2. We flush the buffer[1] and send the DeleteRange request to the KV
193+
// layer.
194+
//
195+
// We choose option 2, as typically the number of keys deleted is large,
196+
// and we may realize we're over budget after performing the initial
197+
// scan of the keyspan. At that point, we'll have to flush the buffer
198+
// anyway. Moreover, buffered writes are most impactful when a
199+
// transaction is writing to a small number of keys. As such, it's fine
200+
// to not optimize the DeleteRange case, as typically it results in a
201+
// large writing transaction.
202+
//
203+
// [1] Technically, we only need to flush the overlapping portion of the
204+
// buffer. However, for simplicity, the txnWriteBuffer doesn't support
205+
// transactions with partially buffered writes and partially flushed
206+
// writes. We could change this in the future if there's benefit to
207+
// doing so.
208+
return twb.flushBufferAndSendBatch(ctx, ba)
209+
}
210+
184211
// Check if buffering writes from the supplied batch will run us over
185212
// budget. If it will, we shouldn't buffer writes from the current batch,
186213
// and flush the buffer.

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func delArgs(key roachpb.Key, seq enginepb.TxnSeq) *kvpb.DeleteRequest {
5656
}
5757
}
5858

59+
func delRangeArgs(key, endKey roachpb.Key, seq enginepb.TxnSeq) *kvpb.DeleteRangeRequest {
60+
return &kvpb.DeleteRangeRequest{
61+
RequestHeader: kvpb.RequestHeader{Key: key, EndKey: endKey, Sequence: seq},
62+
}
63+
}
64+
5965
func makeBufferedWrite(key roachpb.Key, vals ...bufferedValue) bufferedWrite {
6066
return bufferedWrite{key: key, vals: vals}
6167
}
@@ -1495,3 +1501,116 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
14951501
require.Len(t, br.Responses, 1)
14961502
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
14971503
}
1504+
1505+
// TestTxnWriteBufferDeleteRange ensures that the txnWriteBuffer correctly
1506+
// handles DeleteRange requests. In particular, whenever we see a batch with a
1507+
// DeleteRange request, the write buffer is flushed and write buffering is
1508+
// turned off for subsequent requests.
1509+
func TestTxnWriteBufferDeleteRange(t *testing.T) {
1510+
defer leaktest.AfterTest(t)()
1511+
defer log.Scope(t).Close(t)
1512+
ctx := context.Background()
1513+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1514+
1515+
txn := makeTxnProto()
1516+
txn.Sequence = 10
1517+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1518+
valA := "valA"
1519+
1520+
ba := &kvpb.BatchRequest{}
1521+
ba.Header = kvpb.Header{Txn: &txn}
1522+
putA := putArgs(keyA, valA, txn.Sequence)
1523+
delC := delArgs(keyC, txn.Sequence)
1524+
ba.Add(putA)
1525+
ba.Add(delC)
1526+
1527+
numCalled := mockSender.NumCalled()
1528+
br, pErr := twb.SendLocked(ctx, ba)
1529+
require.Nil(t, pErr)
1530+
require.NotNil(t, br)
1531+
// All the requests should be buffered and not make it past the
1532+
// txnWriteBuffer. The response returned should be indistinguishable.
1533+
require.Equal(t, numCalled, mockSender.NumCalled())
1534+
require.Len(t, br.Responses, 2)
1535+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1536+
// Verify the Put was buffered correctly.
1537+
expBufferedWrites := []bufferedWrite{
1538+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1539+
makeBufferedWrite(keyC, makeBufferedValue("", 10)),
1540+
}
1541+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1542+
1543+
// Send a DeleteRange request. This should result in the entire buffer
1544+
// being flushed. Note that we're flushing the delete to key C as well, even
1545+
// though it doesn't overlap with the DeleteRange request.
1546+
ba = &kvpb.BatchRequest{}
1547+
ba.Header = kvpb.Header{Txn: &txn}
1548+
delRange := delRangeArgs(keyA, keyB, txn.Sequence)
1549+
ba.Add(delRange)
1550+
1551+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1552+
require.Len(t, ba.Requests, 3)
1553+
1554+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1555+
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1556+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[2].GetInner())
1557+
1558+
br = ba.CreateReply()
1559+
br.Txn = ba.Txn
1560+
return br, nil
1561+
})
1562+
1563+
br, pErr = twb.SendLocked(ctx, ba)
1564+
require.Nil(t, pErr)
1565+
require.NotNil(t, br)
1566+
// Even though we flushed some writes, it shouldn't make it back to the response.
1567+
require.Len(t, br.Responses, 1)
1568+
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1569+
1570+
// Ensure the buffer is empty at this point.
1571+
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1572+
1573+
// Subsequent batches should not be buffered.
1574+
ba = &kvpb.BatchRequest{}
1575+
ba.Header = kvpb.Header{Txn: &txn}
1576+
putC := putArgs(keyC, valA, txn.Sequence)
1577+
ba.Add(putC)
1578+
1579+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1580+
require.Len(t, ba.Requests, 1)
1581+
1582+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1583+
1584+
br = ba.CreateReply()
1585+
br.Txn = ba.Txn
1586+
return br, nil
1587+
})
1588+
1589+
br, pErr = twb.SendLocked(ctx, ba)
1590+
require.Nil(t, pErr)
1591+
require.NotNil(t, br)
1592+
require.Len(t, br.Responses, 1)
1593+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1594+
1595+
// Commit the transaction. We flushed the buffer already, and no subsequent
1596+
// writes were buffered, so the buffer should be empty. As such, no write
1597+
// requests should be added to the batch.
1598+
ba = &kvpb.BatchRequest{}
1599+
ba.Header = kvpb.Header{Txn: &txn}
1600+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1601+
1602+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1603+
require.Len(t, ba.Requests, 1)
1604+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())
1605+
1606+
br = ba.CreateReply()
1607+
br.Txn = ba.Txn
1608+
return br, nil
1609+
})
1610+
1611+
br, pErr = twb.SendLocked(ctx, ba)
1612+
require.Nil(t, pErr)
1613+
require.NotNil(t, br)
1614+
require.Len(t, br.Responses, 1)
1615+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1616+
}

pkg/sql/logictest/testdata/logic_test/buffered_writes

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,32 @@ query I
126126
SELECT * FROM t2
127127
----
128128
1
129+
130+
# Ensure that DeleteRange requests work correctly with buffered writes. In
131+
# particular, a DeleteRange request results in a buffer flush.
132+
133+
statement ok
134+
CREATE TABLE t3 (k INT PRIMARY KEY)
135+
136+
statement ok
137+
INSERT INTO t3 VALUES (1)
138+
139+
statement ok
140+
BEGIN
141+
142+
statement ok
143+
INSERT INTO t3 VALUES (2)
144+
145+
statement count 0
146+
DELETE FROM t3 WHERE k = 3
147+
148+
statement count 2
149+
DELETE FROM t3 WHERE k < 10 AND k > 0
150+
151+
statement ok
152+
COMMIT
153+
154+
query I
155+
SELECT count(*) from t3
156+
----
157+
0

0 commit comments

Comments
 (0)