Skip to content

Commit f08e98c

Browse files
authored
Merge pull request #152185 from yuzefovich/backport25.2.5-rc-151774
release-25.2.5-rc: kv,sql: explicitly set IsReverse on the Header
2 parents 000a071 + c4c9e44 commit f08e98c

File tree

19 files changed

+167
-102
lines changed

19 files changed

+167
-102
lines changed

pkg/kv/batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
370370
*kvpb.IncrementRequest,
371371
*kvpb.DeleteRequest:
372372
numRows = 1
373+
case *kvpb.ReverseScanRequest:
374+
b.Header.IsReverse = true
373375
}
374376
b.appendReqs(args)
375377
b.initResult(1 /* calls */, numRows, raw, nil)
@@ -740,6 +742,7 @@ func (b *Batch) scan(
740742
str kvpb.KeyLockingStrengthType,
741743
dur kvpb.KeyLockingDurabilityType,
742744
) {
745+
b.Header.IsReverse = isReverse
743746
begin, err := marshalKey(s)
744747
if err != nil {
745748
b.initResult(0, 0, notRaw, err)

pkg/kv/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func (db *DB) scan(
499499
if maxRows > 0 {
500500
b.Header.MaxSpanRequestKeys = maxRows
501501
}
502+
b.Header.IsReverse = isReverse
502503
b.scan(begin, end, isReverse, str, dur)
503504
r, err := getOneResult(db.Run(ctx, b), b)
504505
return r.Rows, err

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,35 +1069,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10691069
return kvpb.NewErrorf("empty batch")
10701070
}
10711071

1072-
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1073-
// Verify that the batch contains only specific requests. Verify that a
1074-
// batch with a ReverseScan only contains ReverseScan range requests.
1075-
var foundForward, foundReverse bool
1076-
for _, req := range ba.Requests {
1077-
inner := req.GetInner()
1078-
switch inner.(type) {
1079-
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1080-
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1081-
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1082-
// Accepted forward range requests.
1083-
foundForward = true
1072+
// Verify that forward and reverse range requests are never in the same
1073+
// batch. Also verify that the batch with limits contains only specific
1074+
// requests.
1075+
var foundForward, foundReverse bool
1076+
var disallowedReq string
1077+
for _, req := range ba.Requests {
1078+
inner := req.GetInner()
1079+
switch inner.(type) {
1080+
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1081+
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1082+
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1083+
// Accepted forward range requests.
1084+
foundForward = true
10841085

1085-
case *kvpb.ReverseScanRequest:
1086-
// Accepted reverse range requests.
1087-
foundReverse = true
1086+
case *kvpb.ReverseScanRequest:
1087+
// Accepted reverse range requests.
1088+
foundReverse = true
10881089

1089-
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1090-
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1091-
// Accepted point requests that can be in batches with limit.
1090+
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1091+
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1092+
// Accepted point requests that can be in batches with limit. No
1093+
// need to set disallowedReq.
10921094

1093-
default:
1094-
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
1095-
}
1096-
}
1097-
if foundForward && foundReverse {
1098-
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
1095+
default:
1096+
disallowedReq = inner.Method().String()
10991097
}
11001098
}
1099+
if foundForward && foundReverse {
1100+
return kvpb.NewErrorf("batch contains both forward and reverse requests")
1101+
}
1102+
if (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) && disallowedReq != "" {
1103+
return kvpb.NewErrorf("batch with limit contains %s request", disallowedReq)
1104+
}
1105+
// Also verify that IsReverse is set accordingly on the batch header.
1106+
if foundForward && ba.Header.IsReverse {
1107+
return kvpb.NewErrorf("batch contains forward requests but IsReverse is set")
1108+
}
1109+
if foundReverse && !ba.Header.IsReverse {
1110+
return kvpb.NewErrorf("batch contains reverse requests but IsReverse is not set")
1111+
}
11011112

11021113
switch ba.WaitPolicy {
11031114
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
@@ -1239,7 +1250,6 @@ func (ds *DistSender) Send(
12391250
if err != nil {
12401251
return nil, kvpb.NewError(err)
12411252
}
1242-
isReverse := ba.IsReverse()
12431253

12441254
// Determine whether this part of the BatchRequest contains a committing
12451255
// EndTxn request.
@@ -1253,9 +1263,9 @@ func (ds *DistSender) Send(
12531263
var rpl *kvpb.BatchResponse
12541264
var pErr *kvpb.Error
12551265
if withParallelCommit {
1256-
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
1266+
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
12571267
} else {
1258-
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
1268+
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
12591269
}
12601270

12611271
if pErr == errNo1PCTxn {
@@ -1446,7 +1456,7 @@ type response struct {
14461456
// method is never invoked recursively, but it is exposed to maintain symmetry
14471457
// with divideAndSendBatchToRanges.
14481458
func (ds *DistSender) divideAndSendParallelCommit(
1449-
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, isReverse bool, batchIdx int,
1459+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
14501460
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
14511461
// Search backwards, looking for the first pre-commit QueryIntent.
14521462
swapIdx := -1
@@ -1462,7 +1472,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
14621472
if swapIdx == -1 {
14631473
// No pre-commit QueryIntents. Nothing to split.
14641474
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
1465-
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1475+
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
14661476
}
14671477

14681478
// Swap the EndTxn request and the first pre-commit QueryIntent. This
@@ -1489,7 +1499,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
14891499
if err != nil {
14901500
return br, kvpb.NewError(err)
14911501
}
1492-
qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
1502+
// No need to process QueryIntentRequests in the reverse order.
1503+
qiBa.IsReverse = false
14931504
qiBatchIdx := batchIdx + 1
14941505
qiResponseCh := make(chan response, 1)
14951506

@@ -1519,7 +1530,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15191530

15201531
// Send the batch with withCommit=true since it will be inflight
15211532
// concurrently with the EndTxn batch below.
1522-
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
1533+
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
15231534
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
15241535
}); err != nil {
15251536
return nil, kvpb.NewError(err)
@@ -1534,10 +1545,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15341545
if err != nil {
15351546
return nil, kvpb.NewError(err)
15361547
}
1537-
// Note that we don't need to recompute isReverse for the updated batch
1538-
// since we only separated out QueryIntentRequests which don't carry the
1539-
// isReverse flag.
1540-
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1548+
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
15411549

15421550
// Wait for the QueryIntent-only batch to complete and stitch
15431551
// the responses together.
@@ -1706,12 +1714,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17061714
// is trimmed against each range which is part of the span and sent
17071715
// either serially or in parallel, if possible.
17081716
//
1709-
// isReverse indicates the direction that the provided span should be
1710-
// iterated over while sending requests. It is passed in by callers
1711-
// instead of being recomputed based on the requests in the batch to
1712-
// prevent the iteration direction from switching midway through a
1713-
// batch, in cases where partial batches recurse into this function.
1714-
//
17151717
// withCommit indicates that the batch contains a transaction commit
17161718
// or that a transaction commit is being run concurrently with this
17171719
// batch. Either way, if this is true then sendToReplicas will need
@@ -1721,12 +1723,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17211723
// being processed by this method. It's specified as non-zero when
17221724
// this method is invoked recursively.
17231725
func (ds *DistSender) divideAndSendBatchToRanges(
1724-
ctx context.Context,
1725-
ba *kvpb.BatchRequest,
1726-
rs roachpb.RSpan,
1727-
isReverse bool,
1728-
withCommit bool,
1729-
batchIdx int,
1726+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
17301727
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
17311728
// Clone the BatchRequest's transaction so that future mutations to the
17321729
// proto don't affect the proto in this batch.
@@ -1736,7 +1733,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17361733
// Get initial seek key depending on direction of iteration.
17371734
var scanDir ScanDirection
17381735
var seekKey roachpb.RKey
1739-
if !isReverse {
1736+
if !ba.IsReverse {
17401737
scanDir = Ascending
17411738
seekKey = rs.Key
17421739
} else {
@@ -1751,7 +1748,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17511748
// Take the fast path if this batch fits within a single range.
17521749
if !ri.NeedAnother(rs) {
17531750
resp := ds.sendPartialBatch(
1754-
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
1751+
ctx, ba, rs, withCommit, batchIdx, ri.Token(),
17551752
)
17561753
// resp.positions remains nil since the original batch is fully
17571754
// contained within a single range.
@@ -1873,7 +1870,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
18731870
}
18741871

18751872
if pErr == nil && couldHaveSkippedResponses {
1876-
fillSkippedResponses(ba, br, seekKey, resumeReason, isReverse)
1873+
fillSkippedResponses(ba, br, seekKey, resumeReason)
18771874
}
18781875
}()
18791876

@@ -1953,7 +1950,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19531950
// If we can reserve one of the limited goroutines available for parallel
19541951
// batch RPCs, send asynchronously.
19551952
if canParallelize && !lastRange && !ds.disableParallelBatches {
1956-
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
1953+
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(), responseCh, positions) {
19571954
asyncSent = true
19581955
} else {
19591956
asyncThrottled = true
@@ -1968,7 +1965,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19681965
}()
19691966
}
19701967
return ds.sendPartialBatch(
1971-
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
1968+
ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(),
19721969
)
19731970
}()
19741971
resp.positions = positions
@@ -2055,7 +2052,6 @@ func (ds *DistSender) sendPartialBatchAsync(
20552052
ctx context.Context,
20562053
ba *kvpb.BatchRequest,
20572054
rs roachpb.RSpan,
2058-
isReverse bool,
20592055
withCommit bool,
20602056
batchIdx int,
20612057
routing rangecache.EvictionToken,
@@ -2074,7 +2070,7 @@ func (ds *DistSender) sendPartialBatchAsync(
20742070
ds.metrics.AsyncSentCount.Inc(1)
20752071
ds.metrics.AsyncInProgress.Inc(1)
20762072
defer ds.metrics.AsyncInProgress.Dec(1)
2077-
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2073+
resp := ds.sendPartialBatch(ctx, ba, rs, withCommit, batchIdx, routing)
20782074
resp.positions = positions
20792075
responseCh <- resp
20802076
},
@@ -2142,7 +2138,6 @@ func (ds *DistSender) sendPartialBatch(
21422138
ctx context.Context,
21432139
ba *kvpb.BatchRequest,
21442140
rs roachpb.RSpan,
2145-
isReverse bool,
21462141
withCommit bool,
21472142
batchIdx int,
21482143
routingTok rangecache.EvictionToken,
@@ -2170,7 +2165,7 @@ func (ds *DistSender) sendPartialBatch(
21702165

21712166
if !routingTok.Valid() {
21722167
var descKey roachpb.RKey
2173-
if isReverse {
2168+
if ba.IsReverse {
21742169
descKey = rs.EndKey
21752170
} else {
21762171
descKey = rs.Key
@@ -2186,7 +2181,7 @@ func (ds *DistSender) sendPartialBatch(
21862181
// replica, while detecting hazardous cases where the follower does
21872182
// not have the latest information and the current descriptor did
21882183
// not result in a successful send.
2189-
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
2184+
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, ba.IsReverse)
21902185
if err != nil {
21912186
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
21922187
// We set pErr if we encountered an error getting the descriptor in
@@ -2209,7 +2204,7 @@ func (ds *DistSender) sendPartialBatch(
22092204
}
22102205
if !intersection.Equal(rs) {
22112206
log.Eventf(ctx, "range shrunk; sub-dividing the request")
2212-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2207+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
22132208
return response{reply: reply, pErr: pErr}
22142209
}
22152210
}
@@ -2311,7 +2306,7 @@ func (ds *DistSender) sendPartialBatch(
23112306
// batch here would give a potentially larger response slice
23122307
// with unknown mapping to our truncated reply).
23132308
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
2314-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2309+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
23152310
return response{reply: reply, pErr: pErr}
23162311
}
23172312
break
@@ -2360,7 +2355,6 @@ func fillSkippedResponses(
23602355
br *kvpb.BatchResponse,
23612356
nextKey roachpb.RKey,
23622357
resumeReason kvpb.ResumeReason,
2363-
isReverse bool,
23642358
) {
23652359
// Some requests might have no response at all if we used a batch-wide
23662360
// limit; simply create trivial responses for those. Note that any type
@@ -2390,7 +2384,7 @@ func fillSkippedResponses(
23902384
for i, resp := range br.Responses {
23912385
req := ba.Requests[i].GetInner()
23922386
hdr := resp.GetInner().Header()
2393-
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
2387+
maybeSetResumeSpan(req, &hdr, nextKey, ba.IsReverse)
23942388
if hdr.ResumeSpan != nil {
23952389
hdr.ResumeReason = resumeReason
23962390
}

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,11 +1009,10 @@ func TestMultiRequestBatchWithFwdAndReverseRequests(t *testing.T) {
10091009
t.Fatal(err)
10101010
}
10111011
b := &kv.Batch{}
1012-
b.Header.MaxSpanRequestKeys = 100
10131012
b.Scan("a", "b")
10141013
b.ReverseScan("a", "b")
10151014
if err := db.Run(ctx, b); !testutils.IsError(
1016-
err, "batch with limit contains both forward and reverse scans",
1015+
err, "batch contains both forward and reverse requests",
10171016
) {
10181017
t.Fatal(err)
10191018
}

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,7 @@ func (w *workerCoordinator) performRequestAsync(
14091409
ba.Header.TargetBytes = targetBytes
14101410
ba.Header.AllowEmpty = !headOfLine
14111411
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
1412+
ba.Header.IsReverse = w.s.reverse
14121413
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
14131414
// applicable (#67885).
14141415
ba.AdmissionHeader = w.requestAdmissionHeader

pkg/kv/kvnemesis/applier_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,16 @@ func TestApplier(t *testing.T) {
252252
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
253253
},
254254
{
255-
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
255+
"batch-mixed-fwd", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3))),
256256
},
257257
{
258-
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
258+
"batch-mixed-rev", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), reverseScanForUpdate(k1, k5))),
259+
},
260+
{
261+
"batch-mixed-err-fwd", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3))),
262+
},
263+
{
264+
"batch-mixed-err-rev", step(batch(put(k2, 2), getForUpdate(k1), reverseScan(k1, k3))),
259265
},
260266
{
261267
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),

pkg/kv/kvnemesis/generator.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,8 +1484,28 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
14841484
g.registerClientOps(&allowed, c)
14851485
numOps := rng.Intn(4)
14861486
ops := make([]Operation, numOps)
1487-
for i := range ops {
1487+
var addedForwardScan, addedReverseScan bool
1488+
for i := 0; i < numOps; i++ {
14881489
ops[i] = g.selectOp(rng, allowed)
1490+
if ops[i].Scan != nil {
1491+
if !ops[i].Scan.Reverse {
1492+
if addedReverseScan {
1493+
// We cannot include the forward scan into the batch
1494+
// that already contains the reverse scan.
1495+
i--
1496+
continue
1497+
}
1498+
addedForwardScan = true
1499+
} else {
1500+
if addedForwardScan {
1501+
// We cannot include the reverse scan into the batch
1502+
// that already contains the forward scan.
1503+
i--
1504+
continue
1505+
}
1506+
addedReverseScan = true
1507+
}
1508+
}
14891509
}
14901510
return batch(ops...)
14911511
}

pkg/kv/kvnemesis/testdata/TestApplier/batch-mixed-err

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)