Skip to content

Commit c4c9e44

Browse files
committed
kv,sql: explicitly set IsReverse on the Header
When processing batches that touch multiple ranges, the DistSender needs to know whether to iterate across those ranges in the forward or reverse manner (i.e. ASC or DESC for range keys). Currently, it only uses the reverse direction if it finds at least one ReverseScan request in the batch, and it uses the forward direction otherwise. This goes against the needs of SQL which might issue point Gets when performing a SQL revscan operation, and currently in such a scenario we would hit an error (instead of returning the results in incorrect order). This commit fixes the issue by introducing `IsReverse` boolean on the batch header to explicitly indicate the direction for range iteration. It seems reasonable that the caller should be explicit about this, and we also add a validation that the boolean is set correctly (meaning that it should be `false` when only forward range requests are present and `true` when only reverse range requests are present). In order to simplify the story a bit, the DistSender will no longer allow batches that have both forward and reverse range requests. Previously, this limitation only applied to the batches with limits, but now it's extended to be unconditional. SQL never issues such batches, so the limitation seems acceptable. This limitation required some updates to the existing tests, including KVNemesis to not generate batches that are now disallowed. See also 619f395 for some related context. Given that the new header field is only examined on the KV client, the change can be backported with no mixed-version concerns. Release note (bug fix): Previously, CockroachDB could hit an error `ERROR: span with results after resume span...` when evaluating some queries with ORDER BY ... DESC in an edge case. The bug has been present since about v22.1 and is now fixed.
1 parent 000a071 commit c4c9e44

File tree

19 files changed

+167
-102
lines changed

19 files changed

+167
-102
lines changed

pkg/kv/batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
370370
*kvpb.IncrementRequest,
371371
*kvpb.DeleteRequest:
372372
numRows = 1
373+
case *kvpb.ReverseScanRequest:
374+
b.Header.IsReverse = true
373375
}
374376
b.appendReqs(args)
375377
b.initResult(1 /* calls */, numRows, raw, nil)
@@ -740,6 +742,7 @@ func (b *Batch) scan(
740742
str kvpb.KeyLockingStrengthType,
741743
dur kvpb.KeyLockingDurabilityType,
742744
) {
745+
b.Header.IsReverse = isReverse
743746
begin, err := marshalKey(s)
744747
if err != nil {
745748
b.initResult(0, 0, notRaw, err)

pkg/kv/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func (db *DB) scan(
499499
if maxRows > 0 {
500500
b.Header.MaxSpanRequestKeys = maxRows
501501
}
502+
b.Header.IsReverse = isReverse
502503
b.scan(begin, end, isReverse, str, dur)
503504
r, err := getOneResult(db.Run(ctx, b), b)
504505
return r.Rows, err

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,35 +1069,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10691069
return kvpb.NewErrorf("empty batch")
10701070
}
10711071

1072-
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1073-
// Verify that the batch contains only specific requests. Verify that a
1074-
// batch with a ReverseScan only contains ReverseScan range requests.
1075-
var foundForward, foundReverse bool
1076-
for _, req := range ba.Requests {
1077-
inner := req.GetInner()
1078-
switch inner.(type) {
1079-
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1080-
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1081-
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1082-
// Accepted forward range requests.
1083-
foundForward = true
1072+
// Verify that forward and reverse range requests are never in the same
1073+
// batch. Also verify that the batch with limits contains only specific
1074+
// requests.
1075+
var foundForward, foundReverse bool
1076+
var disallowedReq string
1077+
for _, req := range ba.Requests {
1078+
inner := req.GetInner()
1079+
switch inner.(type) {
1080+
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1081+
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1082+
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1083+
// Accepted forward range requests.
1084+
foundForward = true
10841085

1085-
case *kvpb.ReverseScanRequest:
1086-
// Accepted reverse range requests.
1087-
foundReverse = true
1086+
case *kvpb.ReverseScanRequest:
1087+
// Accepted reverse range requests.
1088+
foundReverse = true
10881089

1089-
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1090-
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1091-
// Accepted point requests that can be in batches with limit.
1090+
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1091+
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1092+
// Accepted point requests that can be in batches with limit. No
1093+
// need to set disallowedReq.
10921094

1093-
default:
1094-
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
1095-
}
1096-
}
1097-
if foundForward && foundReverse {
1098-
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
1095+
default:
1096+
disallowedReq = inner.Method().String()
10991097
}
11001098
}
1099+
if foundForward && foundReverse {
1100+
return kvpb.NewErrorf("batch contains both forward and reverse requests")
1101+
}
1102+
if (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) && disallowedReq != "" {
1103+
return kvpb.NewErrorf("batch with limit contains %s request", disallowedReq)
1104+
}
1105+
// Also verify that IsReverse is set accordingly on the batch header.
1106+
if foundForward && ba.Header.IsReverse {
1107+
return kvpb.NewErrorf("batch contains forward requests but IsReverse is set")
1108+
}
1109+
if foundReverse && !ba.Header.IsReverse {
1110+
return kvpb.NewErrorf("batch contains reverse requests but IsReverse is not set")
1111+
}
11011112

11021113
switch ba.WaitPolicy {
11031114
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
@@ -1239,7 +1250,6 @@ func (ds *DistSender) Send(
12391250
if err != nil {
12401251
return nil, kvpb.NewError(err)
12411252
}
1242-
isReverse := ba.IsReverse()
12431253

12441254
// Determine whether this part of the BatchRequest contains a committing
12451255
// EndTxn request.
@@ -1253,9 +1263,9 @@ func (ds *DistSender) Send(
12531263
var rpl *kvpb.BatchResponse
12541264
var pErr *kvpb.Error
12551265
if withParallelCommit {
1256-
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
1266+
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
12571267
} else {
1258-
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
1268+
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
12591269
}
12601270

12611271
if pErr == errNo1PCTxn {
@@ -1446,7 +1456,7 @@ type response struct {
14461456
// method is never invoked recursively, but it is exposed to maintain symmetry
14471457
// with divideAndSendBatchToRanges.
14481458
func (ds *DistSender) divideAndSendParallelCommit(
1449-
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, isReverse bool, batchIdx int,
1459+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
14501460
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
14511461
// Search backwards, looking for the first pre-commit QueryIntent.
14521462
swapIdx := -1
@@ -1462,7 +1472,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
14621472
if swapIdx == -1 {
14631473
// No pre-commit QueryIntents. Nothing to split.
14641474
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
1465-
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1475+
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
14661476
}
14671477

14681478
// Swap the EndTxn request and the first pre-commit QueryIntent. This
@@ -1489,7 +1499,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
14891499
if err != nil {
14901500
return br, kvpb.NewError(err)
14911501
}
1492-
qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
1502+
// No need to process QueryIntentRequests in the reverse order.
1503+
qiBa.IsReverse = false
14931504
qiBatchIdx := batchIdx + 1
14941505
qiResponseCh := make(chan response, 1)
14951506

@@ -1519,7 +1530,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15191530

15201531
// Send the batch with withCommit=true since it will be inflight
15211532
// concurrently with the EndTxn batch below.
1522-
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
1533+
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
15231534
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
15241535
}); err != nil {
15251536
return nil, kvpb.NewError(err)
@@ -1534,10 +1545,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15341545
if err != nil {
15351546
return nil, kvpb.NewError(err)
15361547
}
1537-
// Note that we don't need to recompute isReverse for the updated batch
1538-
// since we only separated out QueryIntentRequests which don't carry the
1539-
// isReverse flag.
1540-
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1548+
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
15411549

15421550
// Wait for the QueryIntent-only batch to complete and stitch
15431551
// the responses together.
@@ -1706,12 +1714,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17061714
// is trimmed against each range which is part of the span and sent
17071715
// either serially or in parallel, if possible.
17081716
//
1709-
// isReverse indicates the direction that the provided span should be
1710-
// iterated over while sending requests. It is passed in by callers
1711-
// instead of being recomputed based on the requests in the batch to
1712-
// prevent the iteration direction from switching midway through a
1713-
// batch, in cases where partial batches recurse into this function.
1714-
//
17151717
// withCommit indicates that the batch contains a transaction commit
17161718
// or that a transaction commit is being run concurrently with this
17171719
// batch. Either way, if this is true then sendToReplicas will need
@@ -1721,12 +1723,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17211723
// being processed by this method. It's specified as non-zero when
17221724
// this method is invoked recursively.
17231725
func (ds *DistSender) divideAndSendBatchToRanges(
1724-
ctx context.Context,
1725-
ba *kvpb.BatchRequest,
1726-
rs roachpb.RSpan,
1727-
isReverse bool,
1728-
withCommit bool,
1729-
batchIdx int,
1726+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
17301727
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
17311728
// Clone the BatchRequest's transaction so that future mutations to the
17321729
// proto don't affect the proto in this batch.
@@ -1736,7 +1733,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17361733
// Get initial seek key depending on direction of iteration.
17371734
var scanDir ScanDirection
17381735
var seekKey roachpb.RKey
1739-
if !isReverse {
1736+
if !ba.IsReverse {
17401737
scanDir = Ascending
17411738
seekKey = rs.Key
17421739
} else {
@@ -1751,7 +1748,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17511748
// Take the fast path if this batch fits within a single range.
17521749
if !ri.NeedAnother(rs) {
17531750
resp := ds.sendPartialBatch(
1754-
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
1751+
ctx, ba, rs, withCommit, batchIdx, ri.Token(),
17551752
)
17561753
// resp.positions remains nil since the original batch is fully
17571754
// contained within a single range.
@@ -1873,7 +1870,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
18731870
}
18741871

18751872
if pErr == nil && couldHaveSkippedResponses {
1876-
fillSkippedResponses(ba, br, seekKey, resumeReason, isReverse)
1873+
fillSkippedResponses(ba, br, seekKey, resumeReason)
18771874
}
18781875
}()
18791876

@@ -1953,7 +1950,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19531950
// If we can reserve one of the limited goroutines available for parallel
19541951
// batch RPCs, send asynchronously.
19551952
if canParallelize && !lastRange && !ds.disableParallelBatches {
1956-
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
1953+
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(), responseCh, positions) {
19571954
asyncSent = true
19581955
} else {
19591956
asyncThrottled = true
@@ -1968,7 +1965,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19681965
}()
19691966
}
19701967
return ds.sendPartialBatch(
1971-
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
1968+
ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(),
19721969
)
19731970
}()
19741971
resp.positions = positions
@@ -2055,7 +2052,6 @@ func (ds *DistSender) sendPartialBatchAsync(
20552052
ctx context.Context,
20562053
ba *kvpb.BatchRequest,
20572054
rs roachpb.RSpan,
2058-
isReverse bool,
20592055
withCommit bool,
20602056
batchIdx int,
20612057
routing rangecache.EvictionToken,
@@ -2074,7 +2070,7 @@ func (ds *DistSender) sendPartialBatchAsync(
20742070
ds.metrics.AsyncSentCount.Inc(1)
20752071
ds.metrics.AsyncInProgress.Inc(1)
20762072
defer ds.metrics.AsyncInProgress.Dec(1)
2077-
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2073+
resp := ds.sendPartialBatch(ctx, ba, rs, withCommit, batchIdx, routing)
20782074
resp.positions = positions
20792075
responseCh <- resp
20802076
},
@@ -2142,7 +2138,6 @@ func (ds *DistSender) sendPartialBatch(
21422138
ctx context.Context,
21432139
ba *kvpb.BatchRequest,
21442140
rs roachpb.RSpan,
2145-
isReverse bool,
21462141
withCommit bool,
21472142
batchIdx int,
21482143
routingTok rangecache.EvictionToken,
@@ -2170,7 +2165,7 @@ func (ds *DistSender) sendPartialBatch(
21702165

21712166
if !routingTok.Valid() {
21722167
var descKey roachpb.RKey
2173-
if isReverse {
2168+
if ba.IsReverse {
21742169
descKey = rs.EndKey
21752170
} else {
21762171
descKey = rs.Key
@@ -2186,7 +2181,7 @@ func (ds *DistSender) sendPartialBatch(
21862181
// replica, while detecting hazardous cases where the follower does
21872182
// not have the latest information and the current descriptor did
21882183
// not result in a successful send.
2189-
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
2184+
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, ba.IsReverse)
21902185
if err != nil {
21912186
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
21922187
// We set pErr if we encountered an error getting the descriptor in
@@ -2209,7 +2204,7 @@ func (ds *DistSender) sendPartialBatch(
22092204
}
22102205
if !intersection.Equal(rs) {
22112206
log.Eventf(ctx, "range shrunk; sub-dividing the request")
2212-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2207+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
22132208
return response{reply: reply, pErr: pErr}
22142209
}
22152210
}
@@ -2311,7 +2306,7 @@ func (ds *DistSender) sendPartialBatch(
23112306
// batch here would give a potentially larger response slice
23122307
// with unknown mapping to our truncated reply).
23132308
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
2314-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2309+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
23152310
return response{reply: reply, pErr: pErr}
23162311
}
23172312
break
@@ -2360,7 +2355,6 @@ func fillSkippedResponses(
23602355
br *kvpb.BatchResponse,
23612356
nextKey roachpb.RKey,
23622357
resumeReason kvpb.ResumeReason,
2363-
isReverse bool,
23642358
) {
23652359
// Some requests might have no response at all if we used a batch-wide
23662360
// limit; simply create trivial responses for those. Note that any type
@@ -2390,7 +2384,7 @@ func fillSkippedResponses(
23902384
for i, resp := range br.Responses {
23912385
req := ba.Requests[i].GetInner()
23922386
hdr := resp.GetInner().Header()
2393-
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
2387+
maybeSetResumeSpan(req, &hdr, nextKey, ba.IsReverse)
23942388
if hdr.ResumeSpan != nil {
23952389
hdr.ResumeReason = resumeReason
23962390
}

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,11 +1009,10 @@ func TestMultiRequestBatchWithFwdAndReverseRequests(t *testing.T) {
10091009
t.Fatal(err)
10101010
}
10111011
b := &kv.Batch{}
1012-
b.Header.MaxSpanRequestKeys = 100
10131012
b.Scan("a", "b")
10141013
b.ReverseScan("a", "b")
10151014
if err := db.Run(ctx, b); !testutils.IsError(
1016-
err, "batch with limit contains both forward and reverse scans",
1015+
err, "batch contains both forward and reverse requests",
10171016
) {
10181017
t.Fatal(err)
10191018
}

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,7 @@ func (w *workerCoordinator) performRequestAsync(
14091409
ba.Header.TargetBytes = targetBytes
14101410
ba.Header.AllowEmpty = !headOfLine
14111411
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
1412+
ba.Header.IsReverse = w.s.reverse
14121413
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
14131414
// applicable (#67885).
14141415
ba.AdmissionHeader = w.requestAdmissionHeader

pkg/kv/kvnemesis/applier_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,16 @@ func TestApplier(t *testing.T) {
252252
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
253253
},
254254
{
255-
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
255+
"batch-mixed-fwd", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3))),
256256
},
257257
{
258-
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
258+
"batch-mixed-rev", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), reverseScanForUpdate(k1, k5))),
259+
},
260+
{
261+
"batch-mixed-err-fwd", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3))),
262+
},
263+
{
264+
"batch-mixed-err-rev", step(batch(put(k2, 2), getForUpdate(k1), reverseScan(k1, k3))),
259265
},
260266
{
261267
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),

pkg/kv/kvnemesis/generator.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,8 +1484,28 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
14841484
g.registerClientOps(&allowed, c)
14851485
numOps := rng.Intn(4)
14861486
ops := make([]Operation, numOps)
1487-
for i := range ops {
1487+
var addedForwardScan, addedReverseScan bool
1488+
for i := 0; i < numOps; i++ {
14881489
ops[i] = g.selectOp(rng, allowed)
1490+
if ops[i].Scan != nil {
1491+
if !ops[i].Scan.Reverse {
1492+
if addedReverseScan {
1493+
// We cannot include the forward scan into the batch
1494+
// that already contains the reverse scan.
1495+
i--
1496+
continue
1497+
}
1498+
addedForwardScan = true
1499+
} else {
1500+
if addedForwardScan {
1501+
// We cannot include the reverse scan into the batch
1502+
// that already contains the forward scan.
1503+
i--
1504+
continue
1505+
}
1506+
addedReverseScan = true
1507+
}
1508+
}
14891509
}
14901510
return batch(ops...)
14911511
}

pkg/kv/kvnemesis/testdata/TestApplier/batch-mixed-err

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)