Skip to content

Commit 97da885

Browse files
authored
Merge pull request #152138 from yuzefovich/backport25.3-151774
release-25.3: kv,sql: explicitly set IsReverse on the Header
2 parents 048689e + 2d1f748 commit 97da885

File tree

19 files changed

+165
-102
lines changed

19 files changed

+165
-102
lines changed

pkg/kv/batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
364364
*kvpb.IncrementRequest,
365365
*kvpb.DeleteRequest:
366366
numRows = 1
367+
case *kvpb.ReverseScanRequest:
368+
b.Header.IsReverse = true
367369
}
368370
b.appendReqs(args)
369371
b.initResult(1 /* calls */, numRows, raw, nil)
@@ -734,6 +736,7 @@ func (b *Batch) scan(
734736
str kvpb.KeyLockingStrengthType,
735737
dur kvpb.KeyLockingDurabilityType,
736738
) {
739+
b.Header.IsReverse = isReverse
737740
begin, err := marshalKey(s)
738741
if err != nil {
739742
b.initResult(0, 0, notRaw, err)

pkg/kv/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func (db *DB) scan(
499499
if maxRows > 0 {
500500
b.Header.MaxSpanRequestKeys = maxRows
501501
}
502+
b.Header.IsReverse = isReverse
502503
b.scan(begin, end, isReverse, str, dur)
503504
r, err := getOneResult(db.Run(ctx, b), b)
504505
return r.Rows, err

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,35 +1079,46 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
10791079
return kvpb.NewErrorf("empty batch")
10801080
}
10811081

1082-
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
1083-
// Verify that the batch contains only specific requests. Verify that a
1084-
// batch with a ReverseScan only contains ReverseScan range requests.
1085-
var foundForward, foundReverse bool
1086-
for _, req := range ba.Requests {
1087-
inner := req.GetInner()
1088-
switch inner.(type) {
1089-
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1090-
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1091-
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1092-
// Accepted forward range requests.
1093-
foundForward = true
1082+
// Verify that forward and reverse range requests are never in the same
1083+
// batch. Also verify that the batch with limits contains only specific
1084+
// requests.
1085+
var foundForward, foundReverse bool
1086+
var disallowedReq string
1087+
for _, req := range ba.Requests {
1088+
inner := req.GetInner()
1089+
switch inner.(type) {
1090+
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
1091+
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
1092+
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
1093+
// Accepted forward range requests.
1094+
foundForward = true
10941095

1095-
case *kvpb.ReverseScanRequest:
1096-
// Accepted reverse range requests.
1097-
foundReverse = true
1096+
case *kvpb.ReverseScanRequest:
1097+
// Accepted reverse range requests.
1098+
foundReverse = true
10981099

1099-
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1100-
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1101-
// Accepted point requests that can be in batches with limit.
1100+
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
1101+
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest, *kvpb.PutRequest:
1102+
// Accepted point requests that can be in batches with limit. No
1103+
// need to set disallowedReq.
11021104

1103-
default:
1104-
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
1105-
}
1106-
}
1107-
if foundForward && foundReverse {
1108-
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
1105+
default:
1106+
disallowedReq = inner.Method().String()
11091107
}
11101108
}
1109+
if foundForward && foundReverse {
1110+
return kvpb.NewErrorf("batch contains both forward and reverse requests")
1111+
}
1112+
if (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) && disallowedReq != "" {
1113+
return kvpb.NewErrorf("batch with limit contains %s request", disallowedReq)
1114+
}
1115+
// Also verify that IsReverse is set accordingly on the batch header.
1116+
if foundForward && ba.Header.IsReverse {
1117+
return kvpb.NewErrorf("batch contains forward requests but IsReverse is set")
1118+
}
1119+
if foundReverse && !ba.Header.IsReverse {
1120+
return kvpb.NewErrorf("batch contains reverse requests but IsReverse is not set")
1121+
}
11111122

11121123
switch ba.WaitPolicy {
11131124
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
@@ -1249,7 +1260,6 @@ func (ds *DistSender) Send(
12491260
if err != nil {
12501261
return nil, kvpb.NewError(err)
12511262
}
1252-
isReverse := ba.IsReverse()
12531263

12541264
// Determine whether this part of the BatchRequest contains a committing
12551265
// EndTxn request.
@@ -1263,9 +1273,9 @@ func (ds *DistSender) Send(
12631273
var rpl *kvpb.BatchResponse
12641274
var pErr *kvpb.Error
12651275
if withParallelCommit {
1266-
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
1276+
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
12671277
} else {
1268-
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
1278+
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, 0 /* batchIdx */)
12691279
}
12701280

12711281
if pErr == errNo1PCTxn {
@@ -1456,7 +1466,7 @@ type response struct {
14561466
// method is never invoked recursively, but it is exposed to maintain symmetry
14571467
// with divideAndSendBatchToRanges.
14581468
func (ds *DistSender) divideAndSendParallelCommit(
1459-
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, isReverse bool, batchIdx int,
1469+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, batchIdx int,
14601470
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
14611471
// Search backwards, looking for the first pre-commit QueryIntent.
14621472
swapIdx := -1
@@ -1472,7 +1482,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
14721482
if swapIdx == -1 {
14731483
// No pre-commit QueryIntents. Nothing to split.
14741484
log.VEvent(ctx, 3, "no pre-commit QueryIntents found, sending batch as-is")
1475-
return ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1485+
return ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
14761486
}
14771487

14781488
// Swap the EndTxn request and the first pre-commit QueryIntent. This
@@ -1499,7 +1509,8 @@ func (ds *DistSender) divideAndSendParallelCommit(
14991509
if err != nil {
15001510
return br, kvpb.NewError(err)
15011511
}
1502-
qiIsReverse := false // QueryIntentRequests do not carry the isReverse flag
1512+
// No need to process QueryIntentRequests in the reverse order.
1513+
qiBa.IsReverse = false
15031514
qiBatchIdx := batchIdx + 1
15041515
qiResponseCh := make(chan response, 1)
15051516

@@ -1525,7 +1536,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15251536

15261537
// Send the batch with withCommit=true since it will be inflight
15271538
// concurrently with the EndTxn batch below.
1528-
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, qiIsReverse, true /* withCommit */, qiBatchIdx)
1539+
reply, pErr := ds.divideAndSendBatchToRanges(ctx, qiBa, qiRS, true /* withCommit */, qiBatchIdx)
15291540
qiResponseCh <- response{reply: reply, positions: positions, pErr: pErr}
15301541
}
15311542

@@ -1556,10 +1567,7 @@ func (ds *DistSender) divideAndSendParallelCommit(
15561567
if err != nil {
15571568
return nil, kvpb.NewError(err)
15581569
}
1559-
// Note that we don't need to recompute isReverse for the updated batch
1560-
// since we only separated out QueryIntentRequests which don't carry the
1561-
// isReverse flag.
1562-
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, true /* withCommit */, batchIdx)
1570+
br, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, true /* withCommit */, batchIdx)
15631571

15641572
// Wait for the QueryIntent-only batch to complete and stitch
15651573
// the responses together.
@@ -1728,12 +1736,6 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17281736
// is trimmed against each range which is part of the span and sent
17291737
// either serially or in parallel, if possible.
17301738
//
1731-
// isReverse indicates the direction that the provided span should be
1732-
// iterated over while sending requests. It is passed in by callers
1733-
// instead of being recomputed based on the requests in the batch to
1734-
// prevent the iteration direction from switching midway through a
1735-
// batch, in cases where partial batches recurse into this function.
1736-
//
17371739
// withCommit indicates that the batch contains a transaction commit
17381740
// or that a transaction commit is being run concurrently with this
17391741
// batch. Either way, if this is true then sendToReplicas will need
@@ -1743,12 +1745,7 @@ func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
17431745
// being processed by this method. It's specified as non-zero when
17441746
// this method is invoked recursively.
17451747
func (ds *DistSender) divideAndSendBatchToRanges(
1746-
ctx context.Context,
1747-
ba *kvpb.BatchRequest,
1748-
rs roachpb.RSpan,
1749-
isReverse bool,
1750-
withCommit bool,
1751-
batchIdx int,
1748+
ctx context.Context, ba *kvpb.BatchRequest, rs roachpb.RSpan, withCommit bool, batchIdx int,
17521749
) (br *kvpb.BatchResponse, pErr *kvpb.Error) {
17531750
// Clone the BatchRequest's transaction so that future mutations to the
17541751
// proto don't affect the proto in this batch.
@@ -1758,7 +1755,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17581755
// Get initial seek key depending on direction of iteration.
17591756
var scanDir ScanDirection
17601757
var seekKey roachpb.RKey
1761-
if !isReverse {
1758+
if !ba.IsReverse {
17621759
scanDir = Ascending
17631760
seekKey = rs.Key
17641761
} else {
@@ -1773,7 +1770,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
17731770
// Take the fast path if this batch fits within a single range.
17741771
if !ri.NeedAnother(rs) {
17751772
resp := ds.sendPartialBatch(
1776-
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
1773+
ctx, ba, rs, withCommit, batchIdx, ri.Token(),
17771774
)
17781775
// resp.positions remains nil since the original batch is fully
17791776
// contained within a single range.
@@ -1895,7 +1892,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
18951892
}
18961893

18971894
if pErr == nil && couldHaveSkippedResponses {
1898-
fillSkippedResponses(ba, br, seekKey, resumeReason, isReverse)
1895+
fillSkippedResponses(ba, br, seekKey, resumeReason)
18991896
}
19001897
}()
19011898

@@ -1975,7 +1972,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19751972
// If we can reserve one of the limited goroutines available for parallel
19761973
// batch RPCs, send asynchronously.
19771974
if canParallelize && !lastRange && !ds.disableParallelBatches {
1978-
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
1975+
if ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(), responseCh, positions) {
19791976
asyncSent = true
19801977
} else {
19811978
asyncThrottled = true
@@ -1990,7 +1987,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
19901987
}()
19911988
}
19921989
return ds.sendPartialBatch(
1993-
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
1990+
ctx, curRangeBatch, curRangeRS, withCommit, batchIdx, ri.Token(),
19941991
)
19951992
}()
19961993
resp.positions = positions
@@ -2077,7 +2074,6 @@ func (ds *DistSender) sendPartialBatchAsync(
20772074
ctx context.Context,
20782075
ba *kvpb.BatchRequest,
20792076
rs roachpb.RSpan,
2080-
isReverse bool,
20812077
withCommit bool,
20822078
batchIdx int,
20832079
routing rangecache.EvictionToken,
@@ -2099,7 +2095,7 @@ func (ds *DistSender) sendPartialBatchAsync(
20992095
ds.metrics.AsyncSentCount.Inc(1)
21002096
ds.metrics.AsyncInProgress.Inc(1)
21012097
defer ds.metrics.AsyncInProgress.Dec(1)
2102-
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
2098+
resp := ds.sendPartialBatch(ctx, ba, rs, withCommit, batchIdx, routing)
21032099
resp.positions = positions
21042100
responseCh <- resp
21052101
}(ctx)
@@ -2163,7 +2159,6 @@ func (ds *DistSender) sendPartialBatch(
21632159
ctx context.Context,
21642160
ba *kvpb.BatchRequest,
21652161
rs roachpb.RSpan,
2166-
isReverse bool,
21672162
withCommit bool,
21682163
batchIdx int,
21692164
routingTok rangecache.EvictionToken,
@@ -2191,7 +2186,7 @@ func (ds *DistSender) sendPartialBatch(
21912186

21922187
if !routingTok.Valid() {
21932188
var descKey roachpb.RKey
2194-
if isReverse {
2189+
if ba.IsReverse {
21952190
descKey = rs.EndKey
21962191
} else {
21972192
descKey = rs.Key
@@ -2207,7 +2202,7 @@ func (ds *DistSender) sendPartialBatch(
22072202
// replica, while detecting hazardous cases where the follower does
22082203
// not have the latest information and the current descriptor did
22092204
// not result in a successful send.
2210-
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
2205+
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, ba.IsReverse)
22112206
if err != nil {
22122207
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
22132208
// We set pErr if we encountered an error getting the descriptor in
@@ -2230,7 +2225,7 @@ func (ds *DistSender) sendPartialBatch(
22302225
}
22312226
if !intersection.Equal(rs) {
22322227
log.Eventf(ctx, "range shrunk; sub-dividing the request")
2233-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2228+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
22342229
return response{reply: reply, pErr: pErr}
22352230
}
22362231
}
@@ -2332,7 +2327,7 @@ func (ds *DistSender) sendPartialBatch(
23322327
// batch here would give a potentially larger response slice
23332328
// with unknown mapping to our truncated reply).
23342329
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
2335-
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
2330+
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, withCommit, batchIdx)
23362331
return response{reply: reply, pErr: pErr}
23372332
}
23382333
break
@@ -2381,7 +2376,6 @@ func fillSkippedResponses(
23812376
br *kvpb.BatchResponse,
23822377
nextKey roachpb.RKey,
23832378
resumeReason kvpb.ResumeReason,
2384-
isReverse bool,
23852379
) {
23862380
// Some requests might have no response at all if we used a batch-wide
23872381
// limit; simply create trivial responses for those. Note that any type
@@ -2411,7 +2405,7 @@ func fillSkippedResponses(
24112405
for i, resp := range br.Responses {
24122406
req := ba.Requests[i].GetInner()
24132407
hdr := resp.GetInner().Header()
2414-
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
2408+
maybeSetResumeSpan(req, &hdr, nextKey, ba.IsReverse)
24152409
if hdr.ResumeSpan != nil {
24162410
hdr.ResumeReason = resumeReason
24172411
}

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,11 +1012,10 @@ func TestMultiRequestBatchWithFwdAndReverseRequests(t *testing.T) {
10121012
t.Fatal(err)
10131013
}
10141014
b := &kv.Batch{}
1015-
b.Header.MaxSpanRequestKeys = 100
10161015
b.Scan("a", "b")
10171016
b.ReverseScan("a", "b")
10181017
if err := db.Run(ctx, b); !testutils.IsError(
1019-
err, "batch with limit contains both forward and reverse scans",
1018+
err, "batch contains both forward and reverse requests",
10201019
) {
10211020
t.Fatal(err)
10221021
}

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,6 +1398,7 @@ func (w *workerCoordinator) performRequestAsync(
13981398
ba.Header.TargetBytes = targetBytes
13991399
ba.Header.AllowEmpty = !headOfLine
14001400
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
1401+
ba.Header.IsReverse = w.s.reverse
14011402
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
14021403
// applicable (#67885).
14031404
ba.AdmissionHeader = w.requestAdmissionHeader

pkg/kv/kvnemesis/applier_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,16 @@ func TestApplier(t *testing.T) {
252252
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
253253
},
254254
{
255-
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
255+
"batch-mixed-fwd", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3))),
256256
},
257257
{
258-
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
258+
"batch-mixed-rev", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), reverseScanForUpdate(k1, k5))),
259+
},
260+
{
261+
"batch-mixed-err-fwd", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3))),
262+
},
263+
{
264+
"batch-mixed-err-rev", step(batch(put(k2, 2), getForUpdate(k1), reverseScan(k1, k3))),
259265
},
260266
{
261267
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),

pkg/kv/kvnemesis/generator.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1484,8 +1484,28 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
14841484
g.registerClientOps(&allowed, c)
14851485
numOps := rng.Intn(4)
14861486
ops := make([]Operation, numOps)
1487-
for i := range ops {
1487+
var addedForwardScan, addedReverseScan bool
1488+
for i := 0; i < numOps; i++ {
14881489
ops[i] = g.selectOp(rng, allowed)
1490+
if ops[i].Scan != nil {
1491+
if !ops[i].Scan.Reverse {
1492+
if addedReverseScan {
1493+
// We cannot include the forward scan into the batch
1494+
// that already contains the reverse scan.
1495+
i--
1496+
continue
1497+
}
1498+
addedForwardScan = true
1499+
} else {
1500+
if addedForwardScan {
1501+
// We cannot include the reverse scan into the batch
1502+
// that already contains the forward scan.
1503+
i--
1504+
continue
1505+
}
1506+
addedReverseScan = true
1507+
}
1508+
}
14891509
}
14901510
return batch(ops...)
14911511
}

pkg/kv/kvnemesis/testdata/TestApplier/batch-mixed-err

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)