Skip to content

Commit 2d1f748

Browse files
committed
kv,sql: explicitly set IsReverse on the Header
When processing batches that touch multiple ranges, the DistSender needs to know whether to iterate across those ranges in the forward or reverse manner (i.e. ASC or DESC for range keys). Currently, it only uses the reverse direction if it finds at least one ReverseScan request in the batch, and it uses the forward direction otherwise. This goes against the needs of SQL which might issue point Gets when performing a SQL revscan operation, and currently in such a scenario we would hit an error (instead of returning the results in incorrect order). This commit fixes the issue by introducing `IsReverse` boolean on the batch header to explicitly indicate the direction for range iteration. It seems reasonable that the caller should be explicit about this, and we also add a validation that the boolean is set correctly (meaning that it should be `false` when only forward range requests are present and `true` when only reverse range requests are present). In order to simplify the story a bit, the DistSender will no longer allow batches that have both forward and reverse range requests. Previously, this limitation only applied to the batches with limits, but now it's extended to be unconditional. SQL never issues such batches, so the limitation seems acceptable. This limitation required some updates to the existing tests, including KVNemesis to not generate batches that are now disallowed. See also 619f395 for some related context. Given that the new header field is only examined on the KV client, the change can be backported with no mixed-version concerns. Release note (bug fix): Previously, CockroachDB could hit an error `ERROR: span with results after resume span...` when evaluating some queries with ORDER BY ... DESC in an edge case. The bug has been present since about v22.1 and is now fixed.
1 parent 9e38ea1 commit 2d1f748

File tree

19 files changed

+165
-102
lines changed

19 files changed

+165
-102
lines changed

pkg/kv/batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
364364
*kvpb.IncrementRequest,
365365
*kvpb.DeleteRequest:
366366
numRows = 1
367+
case *kvpb.ReverseScanRequest:
368+
b.Header.IsReverse = true
367369
}
368370
b.appendReqs(args)
369371
b.initResult(1 /* calls */, numRows, raw, nil)
@@ -734,6 +736,7 @@ func (b *Batch) scan(
734736
str kvpb.KeyLockingStrengthType,
735737
dur kvpb.KeyLockingDurabilityType,
736738
) {
739+
b.Header.IsReverse = isReverse
737740
begin, err := marshalKey(s)
738741
if err != nil {
739742
b.initResult(0, 0, notRaw, err)

pkg/kv/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func (db *DB) scan(
499499
if maxRows > 0 {
500500
b.Header.MaxSpanRequestKeys = maxRows
501501
}
502+
b.Header.IsReverse = isReverse
502503
b.scan(begin, end, isReverse, str, dur)
503504
r, err := getOneResult(db.Run(ctx, b), b)
504505
return r.Rows, err

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,35 +1079,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10791079
return kvpb.NewErrorf("empty batch")
10801080
}
10811081

1082-
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1083-
// Verify that the batch contains only specific requests. Verify that a
1084-
// batch with a ReverseScan only contains ReverseScan range requests.
1085-
var foundForward, foundReverse bool
1086-
for _, req := range ba.Requests {
1087-
inner := req.GetInner()
1088-
switch inner.(type) {
1089-
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1090-
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1091-
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1092-
// Accepted forward range requests.
1093-
foundForward = true
1082+
// Verify that forward and reverse range requests are never in the same
1083+
// batch. Also verify that the batch with limits contains only specific
1084+
// requests.
1085+
var foundForward, foundReverse bool
1086+
var disallowedReq string
1087+
for _, req := range ba.Requests {
1088+
inner := req.GetInner()
1089+
switch inner.(type) {
1090+
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1091+
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1092+
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1093+
// Accepted forward range requests.
1094+
foundForward = true
10941095

1095-
case *kvpb.ReverseScanRequest:
1096-
// Accepted reverse range requests.
1097-
foundReverse = true
1096+
case *kvpb.ReverseScanRequest:
1097+
// Accepted reverse range requests.
1098+
foundReverse = true
10981099

1099-
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1100-
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1101-
// Accepted point requests that can be in batches with limit.
1100+
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1101+
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1102+
// Accepted point requests that can be in batches with limit. No
1103+
// need to set disallowedReq.
11021104

1103-
default:
1104-
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
1105-
}
1106-
}
1107-
if foundForward && foundReverse {
1108-
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
1105+
default:
1106+
disallowedReq = inner.Method().String()
11091107
}
11101108
}
1109+
if foundForward && foundReverse {
1110+
return kvpb.NewErrorf("batch contains both forward and reverse requests")
1111+
}
1112+
if (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) && disallowedReq != "" {
1113+
return kvpb.NewErrorf("batch with limit contains %s request", disallowedReq)
1114+
}
1115+
// Also verify that IsReverse is set accordingly on the batch header.
1116+
if foundForward && ba.Header.IsReverse {
1117+
return kvpb.NewErrorf("batch contains forward requests but IsReverse is set")
1118+
}
1119+
if foundReverse && !ba.Header.IsReverse {
1120+
return kvpb.NewErrorf("batch contains reverse requests but IsReverse is not set")
1121+
}
11111122

11121123
switch ba.WaitPolicy {
11131124
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
@@ -1249,7 +1260,6 @@ func (ds *DistSender) Send(
12491260
if err != nil {
12501261
return nil, kvpb.NewError(err)
12511262
}
1252-
isReverse := ba.IsReverse()
12531263

12541264
// Determine whether this part of the BatchRequest contains a committing
12551265
// EndTxn request.
@@ -1263,9 +1273,9 @@ func (ds *DistSender) Send(
12631273
var rpl *kvpb.BatchResponse
12641274
var pErr *kvpb.Error
12651275
if withParallelCommit {
1266-
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
1276+
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
12671277
} else {
1268-
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
1278+
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
12691279
}
12701280

12711281
if pErr == errNo1PCTxn {
@@ -1456,7 +1466,7 @@ type response struct {
14561466
// method is never invoked recursively, but it is exposed to maintain symmetry
14571467
// with divideAndSendBatchToRanges.
14581468
func (ds *DistSender) divideAndSendParallelCommit(
1459-
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, isReverse bool, batchIdx int,
1469+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
14601470
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
14611471
// Search backwards, looking for the first pre-commit QueryIntent.
14621472
swapIdx := -1
@@ -1472,7 +1482,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
14721482
if swapIdx == -1 {
14731483
// No pre-commit QueryIntents. Nothing to split.
14741484
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
1475-
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1485+
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
14761486
}
14771487

14781488
// Swap the EndTxn request and the first pre-commit QueryIntent. This
@@ -1499,7 +1509,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
14991509
if err != nil {
15001510
return br, kvpb.NewError(err)
15011511
}
1502-
qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
1512+
// No need to process QueryIntentRequests in the reverse order.
1513+
qiBa.IsReverse = false
15031514
qiBatchIdx := batchIdx + 1
15041515
qiResponseCh := make(chan response, 1)
15051516

@@ -1525,7 +1536,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15251536

15261537
// Send the batch with withCommit=true since it will be inflight
15271538
// concurrently with the EndTxn batch below.
1528-
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
1539+
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
15291540
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
15301541
}
15311542

@@ -1556,10 +1567,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15561567
if err != nil {
15571568
return nil, kvpb.NewError(err)
15581569
}
1559-
// Note that we don't need to recompute isReverse for the updated batch
1560-
// since we only separated out QueryIntentRequests which don't carry the
1561-
// isReverse flag.
1562-
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1570+
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
15631571

15641572
// Wait for the QueryIntent-only batch to complete and stitch
15651573
// the responses together.
@@ -1728,12 +1736,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17281736
// is trimmed against each range which is part of the span and sent
17291737
// either serially or in parallel, if possible.
17301738
//
1731-
// isReverse indicates the direction that the provided span should be
1732-
// iterated over while sending requests. It is passed in by callers
1733-
// instead of being recomputed based on the requests in the batch to
1734-
// prevent the iteration direction from switching midway through a
1735-
// batch, in cases where partial batches recurse into this function.
1736-
//
17371739
// withCommit indicates that the batch contains a transaction commit
17381740
// or that a transaction commit is being run concurrently with this
17391741
// batch. Either way, if this is true then sendToReplicas will need
@@ -1743,12 +1745,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17431745
// being processed by this method. It's specified as non-zero when
17441746
// this method is invoked recursively.
17451747
func (ds *DistSender) divideAndSendBatchToRanges(
1746-
ctx context.Context,
1747-
ba *kvpb.BatchRequest,
1748-
rs roachpb.RSpan,
1749-
isReverse bool,
1750-
withCommit bool,
1751-
batchIdx int,
1748+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
17521749
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
17531750
// Clone the BatchRequest's transaction so that future mutations to the
17541751
// proto don't affect the proto in this batch.
@@ -1758,7 +1755,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17581755
// Get initial seek key depending on direction of iteration.
17591756
var scanDir ScanDirection
17601757
var seekKey roachpb.RKey
1761-
if !isReverse {
1758+
if !ba.IsReverse {
17621759
scanDir = Ascending
17631760
seekKey = rs.Key
17641761
} else {
@@ -1773,7 +1770,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17731770
// Take the fast path if this batch fits within a single range.
17741771
if !ri.NeedAnother(rs) {
17751772
resp := ds.sendPartialBatch(
1776-
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
1773+
ctx, ba, rs, withCommit, batchIdx, ri.Token(),
17771774
)
17781775
// resp.positions remains nil since the original batch is fully
17791776
// contained within a single range.
@@ -1895,7 +1892,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
18951892
}
18961893

18971894
if pErr == nil && couldHaveSkippedResponses {
1898-
fillSkippedResponses(ba, br, seekKey, resumeReason, isReverse)
1895+
fillSkippedResponses(ba, br, seekKey, resumeReason)
18991896
}
19001897
}()
19011898

@@ -1975,7 +1972,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19751972
// If we can reserve one of the limited goroutines available for parallel
19761973
// batch RPCs, send asynchronously.
19771974
if canParallelize && !lastRange && !ds.disableParallelBatches {
1978-
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
1975+
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(), responseCh, positions) {
19791976
asyncSent = true
19801977
} else {
19811978
asyncThrottled = true
@@ -1990,7 +1987,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19901987
}()
19911988
}
19921989
return ds.sendPartialBatch(
1993-
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
1990+
ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(),
19941991
)
19951992
}()
19961993
resp.positions = positions
@@ -2077,7 +2074,6 @@ func (ds *DistSender) sendPartialBatchAsync(
20772074
ctx context.Context,
20782075
ba *kvpb.BatchRequest,
20792076
rs roachpb.RSpan,
2080-
isReverse bool,
20812077
withCommit bool,
20822078
batchIdx int,
20832079
routing rangecache.EvictionToken,
@@ -2099,7 +2095,7 @@ func (ds *DistSender) sendPartialBatchAsync(
20992095
ds.metrics.AsyncSentCount.Inc(1)
21002096
ds.metrics.AsyncInProgress.Inc(1)
21012097
defer ds.metrics.AsyncInProgress.Dec(1)
2102-
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2098+
resp := ds.sendPartialBatch(ctx, ba, rs, withCommit, batchIdx, routing)
21032099
resp.positions = positions
21042100
responseCh <- resp
21052101
}(ctx)
@@ -2163,7 +2159,6 @@ func (ds *DistSender) sendPartialBatch(
21632159
ctx context.Context,
21642160
ba *kvpb.BatchRequest,
21652161
rs roachpb.RSpan,
2166-
isReverse bool,
21672162
withCommit bool,
21682163
batchIdx int,
21692164
routingTok rangecache.EvictionToken,
@@ -2191,7 +2186,7 @@ func (ds *DistSender) sendPartialBatch(
21912186

21922187
if !routingTok.Valid() {
21932188
var descKey roachpb.RKey
2194-
if isReverse {
2189+
if ba.IsReverse {
21952190
descKey = rs.EndKey
21962191
} else {
21972192
descKey = rs.Key
@@ -2207,7 +2202,7 @@ func (ds *DistSender) sendPartialBatch(
22072202
// replica, while detecting hazardous cases where the follower does
22082203
// not have the latest information and the current descriptor did
22092204
// not result in a successful send.
2210-
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
2205+
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, ba.IsReverse)
22112206
if err != nil {
22122207
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
22132208
// We set pErr if we encountered an error getting the descriptor in
@@ -2230,7 +2225,7 @@ func (ds *DistSender) sendPartialBatch(
22302225
}
22312226
if !intersection.Equal(rs) {
22322227
log.Eventf(ctx, "range shrunk; sub-dividing the request")
2233-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2228+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
22342229
return response{reply: reply, pErr: pErr}
22352230
}
22362231
}
@@ -2332,7 +2327,7 @@ func (ds *DistSender) sendPartialBatch(
23322327
// batch here would give a potentially larger response slice
23332328
// with unknown mapping to our truncated reply).
23342329
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
2335-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2330+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
23362331
return response{reply: reply, pErr: pErr}
23372332
}
23382333
break
@@ -2381,7 +2376,6 @@ func fillSkippedResponses(
23812376
br *kvpb.BatchResponse,
23822377
nextKey roachpb.RKey,
23832378
resumeReason kvpb.ResumeReason,
2384-
isReverse bool,
23852379
) {
23862380
// Some requests might have no response at all if we used a batch-wide
23872381
// limit; simply create trivial responses for those. Note that any type
@@ -2411,7 +2405,7 @@ func fillSkippedResponses(
24112405
for i, resp := range br.Responses {
24122406
req := ba.Requests[i].GetInner()
24132407
hdr := resp.GetInner().Header()
2414-
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
2408+
maybeSetResumeSpan(req, &hdr, nextKey, ba.IsReverse)
24152409
if hdr.ResumeSpan != nil {
24162410
hdr.ResumeReason = resumeReason
24172411
}

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,11 +1012,10 @@ func TestMultiRequestBatchWithFwdAndReverseRequests(t *testing.T) {
10121012
t.Fatal(err)
10131013
}
10141014
b := &kv.Batch{}
1015-
b.Header.MaxSpanRequestKeys = 100
10161015
b.Scan("a", "b")
10171016
b.ReverseScan("a", "b")
10181017
if err := db.Run(ctx, b); !testutils.IsError(
1019-
err, "batch with limit contains both forward and reverse scans",
1018+
err, "batch contains both forward and reverse requests",
10201019
) {
10211020
t.Fatal(err)
10221021
}

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,6 +1398,7 @@ func (w *workerCoordinator) performRequestAsync(
13981398
ba.Header.TargetBytes = targetBytes
13991399
ba.Header.AllowEmpty = !headOfLine
14001400
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
1401+
ba.Header.IsReverse = w.s.reverse
14011402
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
14021403
// applicable (#67885).
14031404
ba.AdmissionHeader = w.requestAdmissionHeader

pkg/kv/kvnemesis/applier_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,16 @@ func TestApplier(t *testing.T) {
252252
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
253253
},
254254
{
255-
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
255+
"batch-mixed-fwd", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3))),
256256
},
257257
{
258-
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
258+
"batch-mixed-rev", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), reverseScanForUpdate(k1, k5))),
259+
},
260+
{
261+
"batch-mixed-err-fwd", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3))),
262+
},
263+
{
264+
"batch-mixed-err-rev", step(batch(put(k2, 2), getForUpdate(k1), reverseScan(k1, k3))),
259265
},
260266
{
261267
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),

pkg/kv/kvnemesis/generator.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,8 +1484,28 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
14841484
g.registerClientOps(&allowed, c)
14851485
numOps := rng.Intn(4)
14861486
ops := make([]Operation, numOps)
1487-
for i := range ops {
1487+
var addedForwardScan, addedReverseScan bool
1488+
for i := 0; i < numOps; i++ {
14881489
ops[i] = g.selectOp(rng, allowed)
1490+
if ops[i].Scan != nil {
1491+
if !ops[i].Scan.Reverse {
1492+
if addedReverseScan {
1493+
// We cannot include the forward scan into the batch
1494+
// that already contains the reverse scan.
1495+
i--
1496+
continue
1497+
}
1498+
addedForwardScan = true
1499+
} else {
1500+
if addedForwardScan {
1501+
// We cannot include the reverse scan into the batch
1502+
// that already contains the forward scan.
1503+
i--
1504+
continue
1505+
}
1506+
addedReverseScan = true
1507+
}
1508+
}
14891509
}
14901510
return batch(ops...)
14911511
}

pkg/kv/kvnemesis/testdata/TestApplier/batch-mixed-err

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)