Skip to content

Commit 9e38ea1

Browse files
authored
Merge pull request #151763 from yuzefovich/blathers/backport-release-25.3-151559
release-25.3: kvcoord: fix txnWriteBuffer for batches with limits and Dels
2 parents d86a2e7 + 4501376 commit 9e38ea1

File tree

4 files changed

+118
-14
lines changed

4 files changed

+118
-14
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,9 +1080,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10801080
}
10811081

10821082
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1083-
// Verify that the batch contains only specific range requests or the
1084-
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
1085-
// ReverseScan range requests.
1083+
// Verify that the batch contains only specific requests. Verify that a
1084+
// batch with a ReverseScan only contains ReverseScan range requests.
10861085
var foundForward, foundReverse bool
10871086
for _, req := range ba.Requests {
10881087
inner := req.GetInner()

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
440440
if t.OriginTimestamp.IsSet() {
441441
return unsupportedOptionError(t.Method(), "OriginTimestamp")
442442
}
443+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
443444
case *kvpb.PutRequest:
445+
// TODO(yuzefovich): the DistSender allows Puts to be in batches
446+
// with limits, which can happen when we're forced to flush the
447+
// buffered Puts, and the batch we piggy-back on has a limit set.
448+
// However, SQL never constructs such a batch on its own, so we're
449+
// asserting the expectations from SQL. Figure out how to reconcile
450+
// this with more permissive DistSender-level checks.
451+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
444452
case *kvpb.DeleteRequest:
445453
case *kvpb.GetRequest:
446454
// ReturnRawMVCCValues is unsupported because we don't know how to serve
@@ -1383,6 +1391,11 @@ func (rr requestRecord) toResp(
13831391
// We only use the response from KV if there wasn't already a
13841392
// buffered value for this key that our transaction wrote
13851393
// previously.
1394+
// TODO(yuzefovich): for completeness, we should check whether
1395+
// ResumeSpan is non-nil, in which case the response from KV is
1396+
// incomplete. This can happen when MaxSpanRequestKeys and/or
1397+
// TargetBytes limits are set on the batch, and SQL currently
1398+
// doesn't do that for batches with CPuts.
13861399
val = br.GetInner().(*kvpb.GetResponse).Value
13871400
}
13881401

@@ -1414,6 +1427,11 @@ func (rr requestRecord) toResp(
14141427

14151428
case *kvpb.PutRequest:
14161429
var dla *bufferedDurableLockAcquisition
1430+
// TODO(yuzefovich): for completeness, we should check whether
1431+
// ResumeSpan is non-nil if we transformed the request, in which case
1432+
// the response from KV is incomplete. This can happen when
1433+
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
1434+
// and SQL currently doesn't do that for batches with Puts.
14171435
if rr.transformed && exclusionTimestampRequired {
14181436
dla = &bufferedDurableLockAcquisition{
14191437
str: lock.Exclusive,
@@ -1427,19 +1445,20 @@ func (rr requestRecord) toResp(
14271445
case *kvpb.DeleteRequest:
14281446
// To correctly populate FoundKey in the response, we must prefer any
14291447
// buffered values (if they exist).
1430-
var foundKey bool
1448+
var resp kvpb.DeleteResponse
14311449
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14321450
if served {
14331451
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1434-
foundKey = val.IsPresent()
1452+
resp.FoundKey = val.IsPresent()
14351453
} else if rr.transformed {
14361454
// We sent a GetRequest to the KV layer to acquire an exclusive lock
14371455
// on the key, populate FoundKey using the response.
14381456
getResp := br.GetInner().(*kvpb.GetResponse)
14391457
if log.ExpensiveLogEnabled(ctx, 2) {
14401458
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
14411459
}
1442-
foundKey = getResp.Value.IsPresent()
1460+
resp.FoundKey = getResp.Value.IsPresent()
1461+
resp.ResumeSpan = getResp.ResumeSpan
14431462
} else {
14441463
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
14451464
// eschew sending a Get request to the KV layer just to populate
@@ -1451,7 +1470,14 @@ func (rr requestRecord) toResp(
14511470
// TODO(arul): improve the FoundKey semantics to have callers opt
14521471
// into whether the care about the key being found. Alternatively,
14531472
// clarify the behaviour on DeleteRequest.
1454-
foundKey = false
1473+
resp.FoundKey = false
1474+
}
1475+
1476+
ru.MustSetInner(&resp)
1477+
if resp.ResumeSpan != nil {
1478+
// When the Get was incomplete, we haven't actually processed this
1479+
// Del, so we cannot buffer the write.
1480+
break
14551481
}
14561482

14571483
var dla *bufferedDurableLockAcquisition
@@ -1463,14 +1489,16 @@ func (rr requestRecord) toResp(
14631489
}
14641490
}
14651491

1466-
ru.MustSetInner(&kvpb.DeleteResponse{
1467-
FoundKey: foundKey,
1468-
})
14691492
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq, dla)
14701493

14711494
case *kvpb.GetRequest:
14721495
val, _, served := twb.maybeServeRead(req.Key, req.Sequence)
14731496
if served {
1497+
// TODO(yuzefovich): we're effectively ignoring the limits of
1498+
// BatchRequest when serving the Get from the buffer. We should
1499+
// consider setting the ResumeSpan if a limit has already been
1500+
// reached by this point. This will force us to set ResumeSpan on
1501+
// all remaining requests in the batch.
14741502
getResp := &kvpb.GetResponse{}
14751503
if val.IsPresent() {
14761504
getResp.Value = val
@@ -2420,8 +2448,6 @@ func (s *respIter) startKey() roachpb.Key {
24202448
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
24212449
// start key for the "next" page, which is exactly the last key that was
24222450
// reverse-scanned for the current response.
2423-
// TODO(yuzefovich): we should have some unit tests that exercise the
2424-
// ResumeSpan case.
24252451
if s.resumeSpan != nil {
24262452
return s.resumeSpan.EndKey
24272453
}
@@ -2492,6 +2518,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
24922518
}
24932519

24942520
func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
2521+
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
2522+
// when serving the reads from the buffer. We should consider checking how
2523+
// many keys and bytes have already been included to see whether we've
2524+
// reached a limit, and set the ResumeSpan if so (which can result in some
2525+
// wasted work by the server).
24952526
h.numKeys++
24962527
lenKV, _ := encKVLength(key, value)
24972528
h.numBytes += int64(lenKV)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
13311331
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
13321332
}
13331333

1334+
// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
1335+
// correctly in presence of BatchRequest's limits that result in non-nil
1336+
// ResumeSpans.
1337+
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
1338+
func TestTxnWriteBufferResumeSpans(t *testing.T) {
1339+
defer leaktest.AfterTest(t)()
1340+
defer log.Scope(t).Close(t)
1341+
ctx := context.Background()
1342+
twb, mockSender, _ := makeMockTxnWriteBuffer(ctx)
1343+
1344+
txn := makeTxnProto()
1345+
txn.Sequence = 1
1346+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1347+
1348+
// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
1349+
// Dels should be processed).
1350+
ba := &kvpb.BatchRequest{}
1351+
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
1352+
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
1353+
del := delArgs(k, txn.Sequence)
1354+
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
1355+
del.MustAcquireExclusiveLock = true
1356+
ba.Add(del)
1357+
}
1358+
1359+
// Simulate a scenario where each transformed Get finds something and the
1360+
// limit is reached after the second Get.
1361+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1362+
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
1363+
require.Len(t, ba.Requests, 3)
1364+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
1365+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
1366+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
1367+
1368+
br := ba.CreateReply()
1369+
br.Txn = ba.Txn
1370+
br.Responses = []kvpb.ResponseUnion{
1371+
{Value: &kvpb.ResponseUnion_Get{
1372+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
1373+
}},
1374+
{Value: &kvpb.ResponseUnion_Get{
1375+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
1376+
}},
1377+
{Value: &kvpb.ResponseUnion_Get{
1378+
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
1379+
ResumeSpan: &roachpb.Span{Key: keyC},
1380+
}},
1381+
}},
1382+
}
1383+
return br, nil
1384+
})
1385+
1386+
br, pErr := twb.SendLocked(ctx, ba)
1387+
require.Nil(t, pErr)
1388+
require.NotNil(t, br)
1389+
1390+
// Even though the txnWriteBuffer did not send any Del requests to the KV
1391+
// layer above, the responses should still be populated.
1392+
require.Len(t, br.Responses, 3)
1393+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
1394+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
1395+
// The last Del wasn't processed, so we should see the ResumeSpan set in the
1396+
// header.
1397+
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)
1398+
1399+
// Verify that only two writes are buffered.
1400+
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
1401+
}
1402+
13341403
// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
13351404
// batches are sorted in sequence number order, as currently required by the txn
13361405
// pipeliner interceptor.

pkg/kv/kvpb/api.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,11 +2779,16 @@ message Header {
27792779
// - RevertRangeRequest
27802780
// - ResolveIntentRangeRequest
27812781
// - QueryLocksRequest
2782+
// - IsSpanEmptyRequest
27822783
//
2783-
// The following two requests types are also allowed in the batch, although
2784-
// the limit has no effect on them:
2784+
// The following requests types are also allowed in the batch, although the
2785+
// limit has no effect on them:
2786+
// - ExportRequest
27852787
// - QueryIntentRequest
27862788
// - EndTxnRequest
2789+
// - ResolveIntentRequest
2790+
// - DeleteRequest
2791+
// - PutRequest
27872792
//
27882793
// [*] DeleteRangeRequests are generally not allowed to be batched together
27892794
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.

0 commit comments

Comments
 (0)