Skip to content

Commit 000a071

Browse files
authored
Merge pull request #152096 from yuzefovich/blathers/backport-release-25.2.5-rc-151767
release-25.2.5-rc: kvcoord: fix txnWriteBuffer for batches with limits and Dels
2 parents b08173d + 174d42f commit 000a071

File tree

4 files changed

+119
-14
lines changed

4 files changed

+119
-14
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,9 +1070,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10701070
}
10711071

10721072
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1073-
// Verify that the batch contains only specific range requests or the
1074-
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
1075-
// ReverseScan range requests.
1073+
// Verify that the batch contains only specific requests. Verify that a
1074+
// batch with a ReverseScan only contains ReverseScan range requests.
10761075
var foundForward, foundReverse bool
10771076
for _, req := range ba.Requests {
10781077
inner := req.GetInner()

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
326326
if t.OriginTimestamp.IsSet() {
327327
return unsupportedOptionError(t.Method(), "OriginTimestamp")
328328
}
329+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
329330
case *kvpb.PutRequest:
331+
// TODO(yuzefovich): the DistSender allows Puts to be in batches
332+
// with limits, which can happen when we're forced to flush the
333+
// buffered Puts, and the batch we piggy-back on has a limit set.
334+
// However, SQL never constructs such a batch on its own, so we're
335+
// asserting the expectations from SQL. Figure out how to reconcile
336+
// this with more permissive DistSender-level checks.
337+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
330338
case *kvpb.DeleteRequest:
331339
case *kvpb.GetRequest:
332340
// ReturnRawMVCCValues is unsupported because we don't know how to serve
@@ -1058,6 +1066,11 @@ func (rr requestRecord) toResp(
10581066
// We only use the response from KV if there wasn't already a
10591067
// buffered value for this key that our transaction wrote
10601068
// previously.
1069+
// TODO(yuzefovich): for completeness, we should check whether
1070+
// ResumeSpan is non-nil, in which case the response from KV is
1071+
// incomplete. This can happen when MaxSpanRequestKeys and/or
1072+
// TargetBytes limits are set on the batch, and SQL currently
1073+
// doesn't do that for batches with CPuts.
10611074
val = br.GetInner().(*kvpb.GetResponse).Value
10621075
}
10631076

@@ -1078,17 +1091,22 @@ func (rr requestRecord) toResp(
10781091
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)
10791092

10801093
case *kvpb.PutRequest:
1094+
// TODO(yuzefovich): for completeness, we should check whether
1095+
// ResumeSpan is non-nil if we transformed the request, in which case
1096+
// the response from KV is incomplete. This can happen when
1097+
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
1098+
// and SQL currently doesn't do that for batches with Puts.
10811099
ru.MustSetInner(&kvpb.PutResponse{})
10821100
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)
10831101

10841102
case *kvpb.DeleteRequest:
10851103
// To correctly populate FoundKey in the response, we must prefer any
10861104
// buffered values (if they exist).
1087-
var foundKey bool
1105+
var resp kvpb.DeleteResponse
10881106
val, served := twb.maybeServeRead(req.Key, req.Sequence)
10891107
if served {
10901108
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1091-
foundKey = val.IsPresent()
1109+
resp.FoundKey = val.IsPresent()
10921110
} else if req.MustAcquireExclusiveLock {
10931111
// We sent a GetRequest to the KV layer to acquire an exclusive lock
10941112
// on the key, regardless of whether the key already exists or not.
@@ -1097,7 +1115,8 @@ func (rr requestRecord) toResp(
10971115
if log.ExpensiveLogEnabled(ctx, 2) {
10981116
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
10991117
}
1100-
foundKey = getResp.Value.IsPresent()
1118+
resp.FoundKey = getResp.Value.IsPresent()
1119+
resp.ResumeSpan = getResp.ResumeSpan
11011120
} else {
11021121
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
11031122
// eschew sending a Get request to the KV layer just to populate
@@ -1109,16 +1128,26 @@ func (rr requestRecord) toResp(
11091128
// TODO(arul): improve the FoundKey semantics to have callers opt
11101129
// into whether the care about the key being found. Alternatively,
11111130
// clarify the behaviour on DeleteRequest.
1112-
foundKey = false
1131+
resp.FoundKey = false
11131132
}
1114-
ru.MustSetInner(&kvpb.DeleteResponse{
1115-
FoundKey: foundKey,
1116-
})
1133+
1134+
ru.MustSetInner(&resp)
1135+
if resp.ResumeSpan != nil {
1136+
// When the Get was incomplete, we haven't actually processed this
1137+
// Del, so we cannot buffer the write.
1138+
break
1139+
}
1140+
11171141
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq)
11181142

11191143
case *kvpb.GetRequest:
11201144
val, served := twb.maybeServeRead(req.Key, req.Sequence)
11211145
if served {
1146+
// TODO(yuzefovich): we're effectively ignoring the limits of
1147+
// BatchRequest when serving the Get from the buffer. We should
1148+
// consider setting the ResumeSpan if a limit has already been
1149+
// reached by this point. This will force us to set ResumeSpan on
1150+
// all remaining requests in the batch.
11221151
getResp := &kvpb.GetResponse{}
11231152
if val.IsPresent() {
11241153
getResp.Value = val
@@ -1612,8 +1641,6 @@ func (s *respIter) startKey() roachpb.Key {
16121641
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
16131642
// start key for the "next" page, which is exactly the last key that was
16141643
// reverse-scanned for the current response.
1615-
// TODO(yuzefovich): we should have some unit tests that exercise the
1616-
// ResumeSpan case.
16171644
if s.resumeSpan != nil {
16181645
return s.resumeSpan.EndKey
16191646
}
@@ -1684,6 +1711,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
16841711
}
16851712

16861713
func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
1714+
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
1715+
// when serving the reads from the buffer. We should consider checking how
1716+
// many keys and bytes have already been included to see whether we've
1717+
// reached a limit, and set the ResumeSpan if so (which can result in some
1718+
// wasted work by the server).
16871719
h.numKeys++
16881720
lenKV, _ := encKVLength(key, value)
16891721
h.numBytes += int64(lenKV)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
13101310
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
13111311
}
13121312

1313+
// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
1314+
// correctly in presence of BatchRequest's limits that result in non-nil
1315+
// ResumeSpans.
1316+
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
1317+
func TestTxnWriteBufferResumeSpans(t *testing.T) {
1318+
defer leaktest.AfterTest(t)()
1319+
defer log.Scope(t).Close(t)
1320+
ctx := context.Background()
1321+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1322+
1323+
txn := makeTxnProto()
1324+
txn.Sequence = 1
1325+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1326+
1327+
// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
1328+
// Dels should be processed).
1329+
ba := &kvpb.BatchRequest{}
1330+
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
1331+
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
1332+
del := delArgs(k, txn.Sequence)
1333+
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
1334+
del.MustAcquireExclusiveLock = true
1335+
ba.Add(del)
1336+
}
1337+
1338+
// Simulate a scenario where each transformed Get finds something and the
1339+
// limit is reached after the second Get.
1340+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1341+
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
1342+
require.Len(t, ba.Requests, 3)
1343+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
1344+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
1345+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
1346+
1347+
br := ba.CreateReply()
1348+
br.Txn = ba.Txn
1349+
br.Responses = []kvpb.ResponseUnion{
1350+
{Value: &kvpb.ResponseUnion_Get{
1351+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
1352+
}},
1353+
{Value: &kvpb.ResponseUnion_Get{
1354+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
1355+
}},
1356+
{Value: &kvpb.ResponseUnion_Get{
1357+
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
1358+
ResumeSpan: &roachpb.Span{Key: keyC},
1359+
}},
1360+
}},
1361+
}
1362+
return br, nil
1363+
})
1364+
1365+
br, pErr := twb.SendLocked(ctx, ba)
1366+
require.Nil(t, pErr)
1367+
require.NotNil(t, br)
1368+
1369+
// Even though the txnWriteBuffer did not send any Del requests to the KV
1370+
// layer above, the responses should still be populated.
1371+
require.Len(t, br.Responses, 3)
1372+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
1373+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
1374+
// The last Del wasn't processed, so we should see the ResumeSpan set in the
1375+
// header.
1376+
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)
1377+
1378+
// Verify that only two writes are buffered.
1379+
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
1380+
}
1381+
13131382
// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
13141383
// batches are sorted in sequence number order, as currently required by the txn
13151384
// pipeliner interceptor.

pkg/kv/kvpb/api.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,11 +2808,16 @@ message Header {
28082808
// - RevertRangeRequest
28092809
// - ResolveIntentRangeRequest
28102810
// - QueryLocksRequest
2811+
// - IsSpanEmptyRequest
28112812
//
2812-
// The following two requests types are also allowed in the batch, although
2813-
// the limit has no effect on them:
2813+
// The following requests types are also allowed in the batch, although the
2814+
// limit has no effect on them:
2815+
// - ExportRequest
28142816
// - QueryIntentRequest
28152817
// - EndTxnRequest
2818+
// - ResolveIntentRequest
2819+
// - DeleteRequest
2820+
// - PutRequest
28162821
//
28172822
// [*] DeleteRangeRequests are generally not allowed to be batched together
28182823
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.

0 commit comments

Comments
 (0)