Skip to content

Commit 4501376

Browse files
committed
kvcoord: fix txnWriteBuffer for batches with limits and Dels
Previously, the txnWriteBuffer was oblivious to the fact that some transformed requests might be returned incomplete due to limits set on the BatchRequest (either TargetBytes or MaxSpanRequestKeys), so it would incorrectly think that it has acquired locks on some keys when it hasn't. Usage from SQL was only exposed to the bug via special delete-range fast-path where we used point Dels (i.e. a stmt of the form `DELETE FROM t WHERE k IN (<ids>)` where there are gaps between `id`s) since it always sets a key limit of 600. This commit fixes this particular issue for Dels transformed into Gets and adds a couple of assertions that we don't see batches with CPuts and/or Puts with the limits set. Additionally, it adjusts the comment to indicate which requests are allowed in batches with limits. Given that this feature is disabled by default and in the private preview AND it's limited to the DELETE fast-path when more than 600 keys are deleted, I decided to omit the release note. Release note: None
1 parent 09786bc commit 4501376

File tree

4 files changed

+118
-14
lines changed

4 files changed

+118
-14
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,9 +1080,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10801080
}
10811081

10821082
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1083-
// Verify that the batch contains only specific range requests or the
1084-
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
1085-
// ReverseScan range requests.
1083+
// Verify that the batch contains only specific requests. Verify that a
1084+
// batch with a ReverseScan only contains ReverseScan range requests.
10861085
var foundForward, foundReverse bool
10871086
for _, req := range ba.Requests {
10881087
inner := req.GetInner()

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
423423
if t.OriginTimestamp.IsSet() {
424424
return unsupportedOptionError(t.Method(), "OriginTimestamp")
425425
}
426+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
426427
case *kvpb.PutRequest:
428+
// TODO(yuzefovich): the DistSender allows Puts to be in batches
429+
// with limits, which can happen when we're forced to flush the
430+
// buffered Puts, and the batch we piggy-back on has a limit set.
431+
// However, SQL never constructs such a batch on its own, so we're
432+
// asserting the expectations from SQL. Figure out how to reconcile
433+
// this with more permissive DistSender-level checks.
434+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
427435
case *kvpb.DeleteRequest:
428436
case *kvpb.GetRequest:
429437
// ReturnRawMVCCValues is unsupported because we don't know how to serve
@@ -1366,6 +1374,11 @@ func (rr requestRecord) toResp(
13661374
// We only use the response from KV if there wasn't already a
13671375
// buffered value for this key that our transaction wrote
13681376
// previously.
1377+
// TODO(yuzefovich): for completeness, we should check whether
1378+
// ResumeSpan is non-nil, in which case the response from KV is
1379+
// incomplete. This can happen when MaxSpanRequestKeys and/or
1380+
// TargetBytes limits are set on the batch, and SQL currently
1381+
// doesn't do that for batches with CPuts.
13691382
val = br.GetInner().(*kvpb.GetResponse).Value
13701383
}
13711384

@@ -1397,6 +1410,11 @@ func (rr requestRecord) toResp(
13971410

13981411
case *kvpb.PutRequest:
13991412
var dla *bufferedDurableLockAcquisition
1413+
// TODO(yuzefovich): for completeness, we should check whether
1414+
// ResumeSpan is non-nil if we transformed the request, in which case
1415+
// the response from KV is incomplete. This can happen when
1416+
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
1417+
// and SQL currently doesn't do that for batches with Puts.
14001418
if rr.transformed && exclusionTimestampRequired {
14011419
dla = &bufferedDurableLockAcquisition{
14021420
str: lock.Exclusive,
@@ -1410,19 +1428,20 @@ func (rr requestRecord) toResp(
14101428
case *kvpb.DeleteRequest:
14111429
// To correctly populate FoundKey in the response, we must prefer any
14121430
// buffered values (if they exist).
1413-
var foundKey bool
1431+
var resp kvpb.DeleteResponse
14141432
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14151433
if served {
14161434
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1417-
foundKey = val.IsPresent()
1435+
resp.FoundKey = val.IsPresent()
14181436
} else if rr.transformed {
14191437
// We sent a GetRequest to the KV layer to acquire an exclusive lock
14201438
// on the key, populate FoundKey using the response.
14211439
getResp := br.GetInner().(*kvpb.GetResponse)
14221440
if log.ExpensiveLogEnabled(ctx, 2) {
14231441
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
14241442
}
1425-
foundKey = getResp.Value.IsPresent()
1443+
resp.FoundKey = getResp.Value.IsPresent()
1444+
resp.ResumeSpan = getResp.ResumeSpan
14261445
} else {
14271446
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
14281447
// eschew sending a Get request to the KV layer just to populate
@@ -1434,7 +1453,14 @@ func (rr requestRecord) toResp(
14341453
// TODO(arul): improve the FoundKey semantics to have callers opt
14351454
// into whether the care about the key being found. Alternatively,
14361455
// clarify the behaviour on DeleteRequest.
1437-
foundKey = false
1456+
resp.FoundKey = false
1457+
}
1458+
1459+
ru.MustSetInner(&resp)
1460+
if resp.ResumeSpan != nil {
1461+
// When the Get was incomplete, we haven't actually processed this
1462+
// Del, so we cannot buffer the write.
1463+
break
14381464
}
14391465

14401466
var dla *bufferedDurableLockAcquisition
@@ -1446,14 +1472,16 @@ func (rr requestRecord) toResp(
14461472
}
14471473
}
14481474

1449-
ru.MustSetInner(&kvpb.DeleteResponse{
1450-
FoundKey: foundKey,
1451-
})
14521475
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq, dla)
14531476

14541477
case *kvpb.GetRequest:
14551478
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14561479
if served {
1480+
// TODO(yuzefovich): we're effectively ignoring the limits of
1481+
// BatchRequest when serving the Get from the buffer. We should
1482+
// consider setting the ResumeSpan if a limit has already been
1483+
// reached by this point. This will force us to set ResumeSpan on
1484+
// all remaining requests in the batch.
14571485
getResp := &kvpb.GetResponse{}
14581486
if val.IsPresent() {
14591487
getResp.Value = val
@@ -2403,8 +2431,6 @@ func (s *respIter) startKey() roachpb.Key {
24032431
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
24042432
// start key for the "next" page, which is exactly the last key that was
24052433
// reverse-scanned for the current response.
2406-
// TODO(yuzefovich): we should have some unit tests that exercise the
2407-
// ResumeSpan case.
24082434
if s.resumeSpan != nil {
24092435
return s.resumeSpan.EndKey
24102436
}
@@ -2475,6 +2501,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
24752501
}
24762502

24772503
func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
2504+
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
2505+
// when serving the reads from the buffer. We should consider checking how
2506+
// many keys and bytes have already been included to see whether we've
2507+
// reached a limit, and set the ResumeSpan if so (which can result in some
2508+
// wasted work by the server).
24782509
h.numKeys++
24792510
lenKV, _ := encKVLength(key, value)
24802511
h.numBytes += int64(lenKV)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
13311331
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
13321332
}
13331333

1334+
// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
1335+
// correctly in presence of BatchRequest's limits that result in non-nil
1336+
// ResumeSpans.
1337+
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
1338+
func TestTxnWriteBufferResumeSpans(t *testing.T) {
1339+
defer leaktest.AfterTest(t)()
1340+
defer log.Scope(t).Close(t)
1341+
ctx := context.Background()
1342+
twb, mockSender, _ := makeMockTxnWriteBuffer(ctx)
1343+
1344+
txn := makeTxnProto()
1345+
txn.Sequence = 1
1346+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1347+
1348+
// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
1349+
// Dels should be processed).
1350+
ba := &kvpb.BatchRequest{}
1351+
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
1352+
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
1353+
del := delArgs(k, txn.Sequence)
1354+
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
1355+
del.MustAcquireExclusiveLock = true
1356+
ba.Add(del)
1357+
}
1358+
1359+
// Simulate a scenario where each transformed Get finds something and the
1360+
// limit is reached after the second Get.
1361+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1362+
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
1363+
require.Len(t, ba.Requests, 3)
1364+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
1365+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
1366+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
1367+
1368+
br := ba.CreateReply()
1369+
br.Txn = ba.Txn
1370+
br.Responses = []kvpb.ResponseUnion{
1371+
{Value: &kvpb.ResponseUnion_Get{
1372+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
1373+
}},
1374+
{Value: &kvpb.ResponseUnion_Get{
1375+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
1376+
}},
1377+
{Value: &kvpb.ResponseUnion_Get{
1378+
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
1379+
ResumeSpan: &roachpb.Span{Key: keyC},
1380+
}},
1381+
}},
1382+
}
1383+
return br, nil
1384+
})
1385+
1386+
br, pErr := twb.SendLocked(ctx, ba)
1387+
require.Nil(t, pErr)
1388+
require.NotNil(t, br)
1389+
1390+
// Even though the txnWriteBuffer did not send any Del requests to the KV
1391+
// layer above, the responses should still be populated.
1392+
require.Len(t, br.Responses, 3)
1393+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
1394+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
1395+
// The last Del wasn't processed, so we should see the ResumeSpan set in the
1396+
// header.
1397+
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)
1398+
1399+
// Verify that only two writes are buffered.
1400+
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
1401+
}
1402+
13341403
// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
13351404
// batches are sorted in sequence number order, as currently required by the txn
13361405
// pipeliner interceptor.

pkg/kv/kvpb/api.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,11 +2779,16 @@ message Header {
27792779
// - RevertRangeRequest
27802780
// - ResolveIntentRangeRequest
27812781
// - QueryLocksRequest
2782+
// - IsSpanEmptyRequest
27822783
//
2783-
// The following two requests types are also allowed in the batch, although
2784-
// the limit has no effect on them:
2784+
// The following requests types are also allowed in the batch, although the
2785+
// limit has no effect on them:
2786+
// - ExportRequest
27852787
// - QueryIntentRequest
27862788
// - EndTxnRequest
2789+
// - ResolveIntentRequest
2790+
// - DeleteRequest
2791+
// - PutRequest
27872792
//
27882793
// [*] DeleteRangeRequests are generally not allowed to be batched together
27892794
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.

0 commit comments

Comments
 (0)