Skip to content

Commit 26fc47b

Browse files
committed
kvcoord: fix txnWriteBuffer for batches with limits and Dels
Previously, the txnWriteBuffer was oblivious to the fact that some transformed requests might be returned incomplete due to limits set on the BatchRequest (either TargetBytes or MaxSpanRequestKeys), so it would incorrectly think that it has acquired locks on some keys when it hasn't. Usage from SQL was only exposed to the bug via special delete-range fast-path where we used point Dels (i.e. a stmt of the form `DELETE FROM t WHERE k IN (<ids>)` where there are gaps between `id`s) since it always sets a key limit of 600. This commit fixes this particular issue for Dels transformed into Gets and adds a couple of assertions that we don't see batches with CPuts and/or Puts with the limits set. Additionally, it adjusts the comment to indicate which requests are allowed in batches with limits. Given that this feature is disabled by default and in the private preview AND it's limited to the DELETE fast-path when more than 600 keys are deleted, I decided to omit the release note. Release note: None
1 parent 65c856a commit 26fc47b

File tree

4 files changed

+118
-14
lines changed

4 files changed

+118
-14
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,9 +1080,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10801080
}
10811081

10821082
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1083-
// Verify that the batch contains only specific range requests or the
1084-
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
1085-
// ReverseScan range requests.
1083+
// Verify that the batch contains only specific requests. Verify that a
1084+
// batch with a ReverseScan only contains ReverseScan range requests.
10861085
var foundForward, foundReverse bool
10871086
for _, req := range ba.Requests {
10881087
inner := req.GetInner()

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
433433
if t.OriginTimestamp.IsSet() {
434434
return unsupportedOptionError(t.Method(), "OriginTimestamp")
435435
}
436+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
436437
case *kvpb.PutRequest:
438+
// TODO(yuzefovich): the DistSender allows Puts to be in batches
439+
// with limits, which can happen when we're forced to flush the
440+
// buffered Puts, and the batch we piggy-back on has a limit set.
441+
// However, SQL never constructs such a batch on its own, so we're
442+
// asserting the expectations from SQL. Figure out how to reconcile
443+
// this with more permissive DistSender-level checks.
444+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
437445
case *kvpb.DeleteRequest:
438446
case *kvpb.GetRequest:
439447
// ReturnRawMVCCValues is unsupported because we don't know how to serve
@@ -1379,6 +1387,11 @@ func (rr requestRecord) toResp(
13791387
// We only use the response from KV if there wasn't already a
13801388
// buffered value for this key that our transaction wrote
13811389
// previously.
1390+
// TODO(yuzefovich): for completeness, we should check whether
1391+
// ResumeSpan is non-nil, in which case the response from KV is
1392+
// incomplete. This can happen when MaxSpanRequestKeys and/or
1393+
// TargetBytes limits are set on the batch, and SQL currently
1394+
// doesn't do that for batches with CPuts.
13821395
val = br.GetInner().(*kvpb.GetResponse).Value
13831396
}
13841397

@@ -1410,6 +1423,11 @@ func (rr requestRecord) toResp(
14101423

14111424
case *kvpb.PutRequest:
14121425
var dla *bufferedDurableLockAcquisition
1426+
// TODO(yuzefovich): for completeness, we should check whether
1427+
// ResumeSpan is non-nil if we transformed the request, in which case
1428+
// the response from KV is incomplete. This can happen when
1429+
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
1430+
// and SQL currently doesn't do that for batches with Puts.
14131431
if rr.transformed && exclusionTimestampRequired {
14141432
dla = &bufferedDurableLockAcquisition{
14151433
str: lock.Exclusive,
@@ -1423,19 +1441,20 @@ func (rr requestRecord) toResp(
14231441
case *kvpb.DeleteRequest:
14241442
// To correctly populate FoundKey in the response, we must prefer any
14251443
// buffered values (if they exist).
1426-
var foundKey bool
1444+
var resp kvpb.DeleteResponse
14271445
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14281446
if served {
14291447
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1430-
foundKey = val.IsPresent()
1448+
resp.FoundKey = val.IsPresent()
14311449
} else if rr.transformed {
14321450
// We sent a GetRequest to the KV layer to acquire an exclusive lock
14331451
// on the key, populate FoundKey using the response.
14341452
getResp := br.GetInner().(*kvpb.GetResponse)
14351453
if log.ExpensiveLogEnabled(ctx, 2) {
14361454
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
14371455
}
1438-
foundKey = getResp.Value.IsPresent()
1456+
resp.FoundKey = getResp.Value.IsPresent()
1457+
resp.ResumeSpan = getResp.ResumeSpan
14391458
} else {
14401459
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
14411460
// eschew sending a Get request to the KV layer just to populate
@@ -1447,7 +1466,14 @@ func (rr requestRecord) toResp(
14471466
// TODO(arul): improve the FoundKey semantics to have callers opt
14481467
// into whether the care about the key being found. Alternatively,
14491468
// clarify the behaviour on DeleteRequest.
1450-
foundKey = false
1469+
resp.FoundKey = false
1470+
}
1471+
1472+
ru.MustSetInner(&resp)
1473+
if resp.ResumeSpan != nil {
1474+
// When the Get was incomplete, we haven't actually processed this
1475+
// Del, so we cannot buffer the write.
1476+
break
14511477
}
14521478

14531479
var dla *bufferedDurableLockAcquisition
@@ -1459,14 +1485,16 @@ func (rr requestRecord) toResp(
14591485
}
14601486
}
14611487

1462-
ru.MustSetInner(&kvpb.DeleteResponse{
1463-
FoundKey: foundKey,
1464-
})
14651488
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq, dla)
14661489

14671490
case *kvpb.GetRequest:
14681491
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14691492
if served {
1493+
// TODO(yuzefovich): we're effectively ignoring the limits of
1494+
// BatchRequest when serving the Get from the buffer. We should
1495+
// consider setting the ResumeSpan if a limit has already been
1496+
// reached by this point. This will force us to set ResumeSpan on
1497+
// all remaining requests in the batch.
14701498
getResp := &kvpb.GetResponse{}
14711499
if val.IsPresent() {
14721500
getResp.Value = val
@@ -2447,8 +2475,6 @@ func (s *respIter) startKey() roachpb.Key {
24472475
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
24482476
// start key for the "next" page, which is exactly the last key that was
24492477
// reverse-scanned for the current response.
2450-
// TODO(yuzefovich): we should have some unit tests that exercise the
2451-
// ResumeSpan case.
24522478
if s.resumeSpan != nil {
24532479
return s.resumeSpan.EndKey
24542480
}
@@ -2519,6 +2545,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
25192545
}
25202546

25212547
func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
2548+
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
2549+
// when serving the reads from the buffer. We should consider checking how
2550+
// many keys and bytes have already been included to see whether we've
2551+
// reached a limit, and set the ResumeSpan if so (which can result in some
2552+
// wasted work by the server).
25222553
h.numKeys++
25232554
lenKV, _ := encKVLength(key, value)
25242555
h.numBytes += int64(lenKV)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,6 +1336,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
13361336
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
13371337
}
13381338

1339+
// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
1340+
// correctly in presence of BatchRequest's limits that result in non-nil
1341+
// ResumeSpans.
1342+
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
1343+
func TestTxnWriteBufferResumeSpans(t *testing.T) {
1344+
defer leaktest.AfterTest(t)()
1345+
defer log.Scope(t).Close(t)
1346+
ctx := context.Background()
1347+
twb, mockSender, _ := makeMockTxnWriteBuffer(ctx)
1348+
1349+
txn := makeTxnProto()
1350+
txn.Sequence = 1
1351+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1352+
1353+
// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
1354+
// Dels should be processed).
1355+
ba := &kvpb.BatchRequest{}
1356+
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
1357+
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
1358+
del := delArgs(k, txn.Sequence)
1359+
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
1360+
del.MustAcquireExclusiveLock = true
1361+
ba.Add(del)
1362+
}
1363+
1364+
// Simulate a scenario where each transformed Get finds something and the
1365+
// limit is reached after the second Get.
1366+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1367+
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
1368+
require.Len(t, ba.Requests, 3)
1369+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
1370+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
1371+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
1372+
1373+
br := ba.CreateReply()
1374+
br.Txn = ba.Txn
1375+
br.Responses = []kvpb.ResponseUnion{
1376+
{Value: &kvpb.ResponseUnion_Get{
1377+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
1378+
}},
1379+
{Value: &kvpb.ResponseUnion_Get{
1380+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
1381+
}},
1382+
{Value: &kvpb.ResponseUnion_Get{
1383+
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
1384+
ResumeSpan: &roachpb.Span{Key: keyC},
1385+
}},
1386+
}},
1387+
}
1388+
return br, nil
1389+
})
1390+
1391+
br, pErr := twb.SendLocked(ctx, ba)
1392+
require.Nil(t, pErr)
1393+
require.NotNil(t, br)
1394+
1395+
// Even though the txnWriteBuffer did not send any Del requests to the KV
1396+
// layer above, the responses should still be populated.
1397+
require.Len(t, br.Responses, 3)
1398+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
1399+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
1400+
// The last Del wasn't processed, so we should see the ResumeSpan set in the
1401+
// header.
1402+
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)
1403+
1404+
// Verify that only two writes are buffered.
1405+
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
1406+
}
1407+
13391408
// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
13401409
// batches are sorted in sequence number order, as currently required by the txn
13411410
// pipeliner interceptor.

pkg/kv/kvpb/api.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2797,11 +2797,16 @@ message Header {
27972797
// - RevertRangeRequest
27982798
// - ResolveIntentRangeRequest
27992799
// - QueryLocksRequest
2800+
// - IsSpanEmptyRequest
28002801
//
2801-
// The following two requests types are also allowed in the batch, although
2802-
// the limit has no effect on them:
2802+
// The following requests types are also allowed in the batch, although the
2803+
// limit has no effect on them:
2804+
// - ExportRequest
28032805
// - QueryIntentRequest
28042806
// - EndTxnRequest
2807+
// - ResolveIntentRequest
2808+
// - DeleteRequest
2809+
// - PutRequest
28052810
//
28062811
// [*] DeleteRangeRequests are generally not allowed to be batched together
28072812
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.

0 commit comments

Comments
 (0)