Skip to content

Commit 22b0b5b

Browse files
craig[bot]stevendanna
andcommitted
Merge #153400
153400: kvcoord: set ResumeSpan on unprocessed point requests r=yuzefovich a=stevendanna Previously, if batch execution was stopped early in dist sender, the returned batch could have requests that were not processed but which didn't have a resume-span sent. The impact of this oversight was minimal since SQL does not create such read-write batches. The txnWriteBuffer may create such batches, but only if undocumented, test-only settings were in use. We might want to consider whether such read-write batches are _ever_ needed by SQL or other internal database users. Fixes #153357 Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents eb447f5 + beeb29a commit 22b0b5b

File tree

2 files changed

+78
-6
lines changed

2 files changed

+78
-6
lines changed

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2411,8 +2411,8 @@ func fillSkippedResponses(
24112411
func maybeSetResumeSpan(
24122412
req kvpb.Request, hdr *kvpb.ResponseHeader, nextKey roachpb.RKey, isReverse bool,
24132413
) {
2414-
if _, ok := req.(*kvpb.GetRequest); ok {
2415-
// This is a Get request. There are three possibilities:
2414+
if !kvpb.IsRange(req) {
2415+
// This is a point request. There are three possibilities:
24162416
//
24172417
// 1. The request was completed. In this case we don't want a ResumeSpan.
24182418
//
@@ -2443,10 +2443,6 @@ func maybeSetResumeSpan(
24432443
return
24442444
}
24452445

2446-
if !kvpb.IsRange(req) {
2447-
return
2448-
}
2449-
24502446
origHeader := req.Header()
24512447
if isReverse {
24522448
if hdr.ResumeSpan != nil {

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4535,3 +4535,79 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
45354535
close(blockCh)
45364536
wg.Wait()
45374537
}
4538+
4539+
// TestUnprocessedWritesHaveResumeSpanSet tests that writes that weren't
4540+
// processed because proceeding requests exhausted a TargetBytes or
4541+
// MaxSpanRequestKeys limit have an appropriate ResumeSpan set.
4542+
func TestUnprocessedWritesHaveResumeSpanSet(t *testing.T) {
4543+
defer leaktest.AfterTest(t)()
4544+
defer log.Scope(t).Close(t)
4545+
ctx := context.Background()
4546+
4547+
s, db := startNoSplitMergeServer(t)
4548+
defer s.Stopper().Stop(ctx)
4549+
4550+
var (
4551+
existingKey = "a"
4552+
splitPoint = "b"
4553+
putKey = "c"
4554+
)
4555+
4556+
// Set up a split so that our read and write below are in different ranges.
4557+
_, _, err := s.SplitRange(roachpb.Key(splitPoint))
4558+
require.NoError(t, err)
4559+
require.NoError(t, db.Put(ctx, existingKey, "existing-value"))
4560+
4561+
err = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4562+
b := txn.NewBatch()
4563+
b.Header.MaxSpanRequestKeys = 1
4564+
// We use a ScanForUpdate here rather than Scan to ensure that the
4565+
// quota-consuming request is also a write (see #153397)
4566+
b.ScanForUpdate(existingKey, splitPoint, kvpb.GuaranteedDurability)
4567+
b.Del(putKey)
4568+
if err := txn.Run(ctx, b); err != nil {
4569+
return err
4570+
}
4571+
require.Len(t, b.Results, 2)
4572+
putResult := b.Results[1]
4573+
require.NotNil(t, putResult.ResumeSpan, "Put response should have a non-nil ResumeSpan")
4574+
return nil
4575+
})
4576+
require.NoError(t, err)
4577+
}
4578+
4579+
// TestMaxSpanRequestKeysWithMixedReadWriteBatches tests whether
4580+
// MaxSpanRequestKeys is respected for certain read-write batches. This test
4581+
// currently asserts that it _isn't_ respected, demonstrating a bug discovered
4582+
// when writing TestUnprocessedWritesHaveResumeSpanSet and filed as #153397.
4583+
func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) {
4584+
defer leaktest.AfterTest(t)()
4585+
defer log.Scope(t).Close(t)
4586+
ctx := context.Background()
4587+
4588+
s, db := startNoSplitMergeServer(t)
4589+
defer s.Stopper().Stop(ctx)
4590+
4591+
var (
4592+
startKey = "a"
4593+
endKey = "b"
4594+
)
4595+
4596+
require.NoError(t, db.Put(ctx, startKey, "existing-value"))
4597+
4598+
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4599+
b := txn.NewBatch()
4600+
b.Header.MaxSpanRequestKeys = 1
4601+
b.Scan(startKey, endKey)
4602+
b.ScanForUpdate(startKey, endKey, kvpb.GuaranteedDurability)
4603+
if err := txn.Run(ctx, b); err != nil {
4604+
return err
4605+
}
4606+
require.Len(t, b.Results, 2)
4607+
require.Nil(t, b.Results[0].ResumeSpan, "first scan should be fully processed")
4608+
// TODO(#153397): The second scan here should not have been fully processed.
4609+
require.Nil(t, b.Results[1].ResumeSpan, "second scan should not be fully processed")
4610+
return nil
4611+
})
4612+
require.NoError(t, err)
4613+
}

0 commit comments

Comments
 (0)