Skip to content

Commit 174d42f

Browse files
committed
kvcoord: fix txnWriteBuffer for batches with limits and Dels
Previously, the txnWriteBuffer was oblivious to the fact that some transformed requests might be returned incomplete due to limits set on the BatchRequest (either TargetBytes or MaxSpanRequestKeys), so it would incorrectly think that it has acquired locks on some keys when it hasn't. Usage from SQL was only exposed to the bug via special delete-range fast-path where we used point Dels (i.e. a stmt of the form `DELETE FROM t WHERE k IN (<ids>)` where there are gaps between `id`s) since it always sets a key limit of 600. This commit fixes this particular issue for Dels transformed into Gets and adds a couple of assertions that we don't see batches with CPuts and/or Puts with the limits set. Additionally, it adjusts the comment to indicate which requests are allowed in batches with limits. Given that this feature is disabled by default and in the private preview AND it's limited to the DELETE fast-path when more than 600 keys are deleted, I decided to omit the release note. Release note: None
1 parent b08173d commit 174d42f

File tree

4 files changed

+119
-14
lines changed

4 files changed

+119
-14
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,9 +1070,8 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10701070
}
10711071

10721072
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1073-
// Verify that the batch contains only specific range requests or the
1074-
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
1075-
// ReverseScan range requests.
1073+
// Verify that the batch contains only specific requests. Verify that a
1074+
// batch with a ReverseScan only contains ReverseScan range requests.
10761075
var foundForward, foundReverse bool
10771076
for _, req := range ba.Requests {
10781077
inner := req.GetInner()

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,15 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
326326
if t.OriginTimestamp.IsSet() {
327327
return unsupportedOptionError(t.Method(), "OriginTimestamp")
328328
}
329+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found CPut in a BatchRequest with a limit")
329330
case *kvpb.PutRequest:
331+
// TODO(yuzefovich): the DistSender allows Puts to be in batches
332+
// with limits, which can happen when we're forced to flush the
333+
// buffered Puts, and the batch we piggy-back on has a limit set.
334+
// However, SQL never constructs such a batch on its own, so we're
335+
// asserting the expectations from SQL. Figure out how to reconcile
336+
// this with more permissive DistSender-level checks.
337+
assertTrue(ba.MaxSpanRequestKeys == 0 && ba.TargetBytes == 0, "unexpectedly found Put in a BatchRequest with a limit")
330338
case *kvpb.DeleteRequest:
331339
case *kvpb.GetRequest:
332340
// ReturnRawMVCCValues is unsupported because we don't know how to serve
@@ -1058,6 +1066,11 @@ func (rr requestRecord) toResp(
10581066
// We only use the response from KV if there wasn't already a
10591067
// buffered value for this key that our transaction wrote
10601068
// previously.
1069+
// TODO(yuzefovich): for completeness, we should check whether
1070+
// ResumeSpan is non-nil, in which case the response from KV is
1071+
// incomplete. This can happen when MaxSpanRequestKeys and/or
1072+
// TargetBytes limits are set on the batch, and SQL currently
1073+
// doesn't do that for batches with CPuts.
10611074
val = br.GetInner().(*kvpb.GetResponse).Value
10621075
}
10631076

@@ -1078,17 +1091,22 @@ func (rr requestRecord) toResp(
10781091
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)
10791092

10801093
case *kvpb.PutRequest:
1094+
// TODO(yuzefovich): for completeness, we should check whether
1095+
// ResumeSpan is non-nil if we transformed the request, in which case
1096+
// the response from KV is incomplete. This can happen when
1097+
// MaxSpanRequestKeys and/or TargetBytes limits are set on the batch,
1098+
// and SQL currently doesn't do that for batches with Puts.
10811099
ru.MustSetInner(&kvpb.PutResponse{})
10821100
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)
10831101

10841102
case *kvpb.DeleteRequest:
10851103
// To correctly populate FoundKey in the response, we must prefer any
10861104
// buffered values (if they exist).
1087-
var foundKey bool
1105+
var resp kvpb.DeleteResponse
10881106
val, served := twb.maybeServeRead(req.Key, req.Sequence)
10891107
if served {
10901108
log.VEventf(ctx, 2, "serving read portion of %s on key %s from the buffer", req.Method(), req.Key)
1091-
foundKey = val.IsPresent()
1109+
resp.FoundKey = val.IsPresent()
10921110
} else if req.MustAcquireExclusiveLock {
10931111
// We sent a GetRequest to the KV layer to acquire an exclusive lock
10941112
// on the key, regardless of whether the key already exists or not.
@@ -1097,7 +1115,8 @@ func (rr requestRecord) toResp(
10971115
if log.ExpensiveLogEnabled(ctx, 2) {
10981116
log.Eventf(ctx, "synthesizing DeleteResponse from GetResponse: %#v", getResp)
10991117
}
1100-
foundKey = getResp.Value.IsPresent()
1118+
resp.FoundKey = getResp.Value.IsPresent()
1119+
resp.ResumeSpan = getResp.ResumeSpan
11011120
} else {
11021121
// NB: If MustAcquireExclusiveLock wasn't set by the client then we
11031122
// eschew sending a Get request to the KV layer just to populate
@@ -1109,16 +1128,26 @@ func (rr requestRecord) toResp(
11091128
// TODO(arul): improve the FoundKey semantics to have callers opt
11101129
// into whether the care about the key being found. Alternatively,
11111130
// clarify the behaviour on DeleteRequest.
1112-
foundKey = false
1131+
resp.FoundKey = false
11131132
}
1114-
ru.MustSetInner(&kvpb.DeleteResponse{
1115-
FoundKey: foundKey,
1116-
})
1133+
1134+
ru.MustSetInner(&resp)
1135+
if resp.ResumeSpan != nil {
1136+
// When the Get was incomplete, we haven't actually processed this
1137+
// Del, so we cannot buffer the write.
1138+
break
1139+
}
1140+
11171141
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq)
11181142

11191143
case *kvpb.GetRequest:
11201144
val, served := twb.maybeServeRead(req.Key, req.Sequence)
11211145
if served {
1146+
// TODO(yuzefovich): we're effectively ignoring the limits of
1147+
// BatchRequest when serving the Get from the buffer. We should
1148+
// consider setting the ResumeSpan if a limit has already been
1149+
// reached by this point. This will force us to set ResumeSpan on
1150+
// all remaining requests in the batch.
11221151
getResp := &kvpb.GetResponse{}
11231152
if val.IsPresent() {
11241153
getResp.Value = val
@@ -1612,8 +1641,6 @@ func (s *respIter) startKey() roachpb.Key {
16121641
// For ReverseScans, the EndKey of the ResumeSpan is updated to indicate the
16131642
// start key for the "next" page, which is exactly the last key that was
16141643
// reverse-scanned for the current response.
1615-
// TODO(yuzefovich): we should have some unit tests that exercise the
1616-
// ResumeSpan case.
16171644
if s.resumeSpan != nil {
16181645
return s.resumeSpan.EndKey
16191646
}
@@ -1684,6 +1711,11 @@ func makeRespSizeHelper(it *respIter) respSizeHelper {
16841711
}
16851712

16861713
func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) {
1714+
// TODO(yuzefovich): we're effectively ignoring the limits of BatchRequest
1715+
// when serving the reads from the buffer. We should consider checking how
1716+
// many keys and bytes have already been included to see whether we've
1717+
// reached a limit, and set the ResumeSpan if so (which can result in some
1718+
// wasted work by the server).
16871719
h.numKeys++
16881720
lenKV, _ := encKVLength(key, value)
16891721
h.numBytes += int64(lenKV)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,75 @@ func TestTxnWriteBufferRespectsMustAcquireExclusiveLock(t *testing.T) {
13101310
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
13111311
}
13121312

1313+
// TestTxnWriteBufferResumeSpans verifies that the txnWriteBuffer behaves
1314+
// correctly in presence of BatchRequest's limits that result in non-nil
1315+
// ResumeSpans.
1316+
// TODO(yuzefovich): extend the test for Scans and ReverseScans.
1317+
func TestTxnWriteBufferResumeSpans(t *testing.T) {
1318+
defer leaktest.AfterTest(t)()
1319+
defer log.Scope(t).Close(t)
1320+
ctx := context.Background()
1321+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1322+
1323+
txn := makeTxnProto()
1324+
txn.Sequence = 1
1325+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1326+
1327+
// Delete 3 keys while setting MaxSpanRequestKeys to 2 (only the first two
1328+
// Dels should be processed).
1329+
ba := &kvpb.BatchRequest{}
1330+
ba.Header = kvpb.Header{Txn: &txn, MaxSpanRequestKeys: 2}
1331+
for _, k := range []roachpb.Key{keyA, keyB, keyC} {
1332+
del := delArgs(k, txn.Sequence)
1333+
// Set MustAcquireExclusiveLock so that Del is transformed into Get.
1334+
del.MustAcquireExclusiveLock = true
1335+
ba.Add(del)
1336+
}
1337+
1338+
// Simulate a scenario where each transformed Get finds something and the
1339+
// limit is reached after the second Get.
1340+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1341+
require.Equal(t, int64(2), ba.MaxSpanRequestKeys)
1342+
require.Len(t, ba.Requests, 3)
1343+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
1344+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
1345+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
1346+
1347+
br := ba.CreateReply()
1348+
br.Txn = ba.Txn
1349+
br.Responses = []kvpb.ResponseUnion{
1350+
{Value: &kvpb.ResponseUnion_Get{
1351+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("a")}},
1352+
}},
1353+
{Value: &kvpb.ResponseUnion_Get{
1354+
Get: &kvpb.GetResponse{Value: &roachpb.Value{RawBytes: []byte("b")}},
1355+
}},
1356+
{Value: &kvpb.ResponseUnion_Get{
1357+
Get: &kvpb.GetResponse{ResponseHeader: kvpb.ResponseHeader{
1358+
ResumeSpan: &roachpb.Span{Key: keyC},
1359+
}},
1360+
}},
1361+
}
1362+
return br, nil
1363+
})
1364+
1365+
br, pErr := twb.SendLocked(ctx, ba)
1366+
require.Nil(t, pErr)
1367+
require.NotNil(t, br)
1368+
1369+
// Even though the txnWriteBuffer did not send any Del requests to the KV
1370+
// layer above, the responses should still be populated.
1371+
require.Len(t, br.Responses, 3)
1372+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[0].GetInner())
1373+
require.Equal(t, &kvpb.DeleteResponse{FoundKey: true}, br.Responses[1].GetInner())
1374+
// The last Del wasn't processed, so we should see the ResumeSpan set in the
1375+
// header.
1376+
require.NotNil(t, br.Responses[2].GetInner().(*kvpb.DeleteResponse).ResumeSpan)
1377+
1378+
// Verify that only two writes are buffered.
1379+
require.Equal(t, 2, len(twb.testingBufferedWritesAsSlice()))
1380+
}
1381+
13131382
// TestTxnWriteBufferMustSortBatchesBySequenceNumber verifies that flushed
13141383
// batches are sorted in sequence number order, as currently required by the txn
13151384
// pipeliner interceptor.

pkg/kv/kvpb/api.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,11 +2808,16 @@ message Header {
28082808
// - RevertRangeRequest
28092809
// - ResolveIntentRangeRequest
28102810
// - QueryLocksRequest
2811+
// - IsSpanEmptyRequest
28112812
//
2812-
// The following two requests types are also allowed in the batch, although
2813-
// the limit has no effect on them:
2813+
// The following requests types are also allowed in the batch, although the
2814+
// limit has no effect on them:
2815+
// - ExportRequest
28142816
// - QueryIntentRequest
28152817
// - EndTxnRequest
2818+
// - ResolveIntentRequest
2819+
// - DeleteRequest
2820+
// - PutRequest
28162821
//
28172822
// [*] DeleteRangeRequests are generally not allowed to be batched together
28182823
// with a commit (i.e. 1PC), except if Require1PC is also set. See #37457.

0 commit comments

Comments
 (0)