Skip to content

Commit 805a7f2

Browse files
craig[bot]arulajmanipav-kv
committed
139612: requestbatcher: batch requests per-AC priority r=arulajmani a=arulajmani Previously, the batcher would batch requests to a given range together, and merge AC headers to pick the most permissive. This meant that lower priority AC work could get bumped to higher priority. This patch prevents this by batching per-range, per-AC priority. There's some ugliness here around requests that bypass AC entirely. They do so by setting the Source field; to ensure these are batched correctly, we bucket them separately. Closes #136029 Release note: None 147469: stateloader: untangle SynthesizeRaftState r=pav-kv a=pav-kv This PR starts untangling the opaque code in `SynthesizeHardState` and `SynthesizeRaftState`, with the goals of (a) making it simple, (b) decoupling log and state machine reads/writes. Right now, these funcs are used in exactly 2 cases: 1. during cluster bootstrap, for creating the initial set of ranges 2. when applying a split trigger, for initializing the RHS (which is uninitialized and only possibly can have a `HardState`) In both cases, the replica's state machine and log are initialized at `RaftInitialLogIndex/Term`. This means that everything these funcs generate is mostly based on constants (modulo the `HardState` in case 2). But it is really hard to tell by reading it. After this PR, case (1) is decoupled into its own `WriteInitialRaftState` func, and directly produces the right output. The "synthesize" funcs are now only used by splits, and can be further simplified in follow-up PRs. The first half of this PR adds a datadriven test for `WriteInitialRangeState` showing the state of a typical replica created at cluster bootstrap. The printout only includes `RangeID`-local state, and no user space keys. The test helps ensuring that subsequent commits are no-ops. Epic: CRDB-49111 Co-authored-by: Arul Ajmani <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]>
3 parents 44563fe + f433f3e + b8f7a55 commit 805a7f2

File tree

12 files changed

+434
-77
lines changed

12 files changed

+434
-77
lines changed

pkg/internal/client/requestbatcher/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ go_library(
99
"//pkg/kv",
1010
"//pkg/kv/kvpb",
1111
"//pkg/roachpb",
12+
"//pkg/util/admission/admissionpb",
13+
"//pkg/util/buildutil",
1214
"//pkg/util/log",
1315
"//pkg/util/stop",
1416
"//pkg/util/timeutil",
@@ -25,7 +27,9 @@ go_test(
2527
"//pkg/kv/kvpb",
2628
"//pkg/roachpb",
2729
"//pkg/testutils",
30+
"//pkg/util/admission/admissionpb",
2831
"//pkg/util/leaktest",
32+
"//pkg/util/log",
2933
"//pkg/util/stop",
3034
"//pkg/util/timeutil",
3135
"@com_github_cockroachdb_errors//:errors",

pkg/internal/client/requestbatcher/batcher.go

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
// this library.
1313
//
1414
// Batching assumes that data with the same key can be sent in a single batch.
15-
// The initial implementation uses rangeID as the key explicitly to avoid
16-
// creating an overly general solution without motivation but interested readers
17-
// should recognize that it would be easy to extend this package to accept an
15+
// The implementation here uses rangeID and admission priority to construct
16+
// batches. This may be extended, or generalized, in the future by accepting a
1817
// arbitrary comparable key.
1918
package requestbatcher
2019

@@ -27,6 +26,8 @@ import (
2726
"github.com/cockroachdb/cockroach/pkg/kv"
2827
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2928
"github.com/cockroachdb/cockroach/pkg/roachpb"
29+
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
30+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
3031
"github.com/cockroachdb/cockroach/pkg/util/log"
3132
"github.com/cockroachdb/cockroach/pkg/util/stop"
3233
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -162,8 +163,8 @@ type Config struct {
162163
// RequestBatcher -- a BatchRequest on that range will timeout, which will
163164
// timeout and fail the entire batch.
164165
//
165-
// TODO(sumeer): once intent resolution is subject to admission control, we
166-
// could have timeouts even though a range is available. Is that desirable?
166+
// TODO(sumeer): we could have timeouts even though a range is available. Is
167+
// that desirable?
167168
MaxTimeout time.Duration
168169

169170
// InFlightBackpressureLimit is the number of batches in flight above which
@@ -437,6 +438,11 @@ func (b *RequestBatcher) sendResponse(req *request, resp Response) {
437438
}
438439

439440
func addRequestToBatch(cfg *Config, now time.Time, ba *batch, r *request) (shouldSend bool) {
441+
testingAssert(ba.empty() || admissionPriority(ba.admissionHeader()) == admissionPriority(r.header),
442+
"requests with different admission headers shouldn't be added to the same batch")
443+
testingAssert(ba.empty() || ba.rangeID() == r.rangeID,
444+
"requests with different range IDs shouldn't be added to the same batch")
445+
440446
// Update the deadline for the batch if this requests's deadline is later
441447
// than the current latest.
442448
rDeadline, rHasDeadline := r.ctx.Deadline()
@@ -521,7 +527,7 @@ func (b *RequestBatcher) run(ctx context.Context) {
521527
}
522528
handleRequest = func(req *request) {
523529
now := b.now()
524-
ba, existsInQueue := b.batches.get(req.rangeID)
530+
ba, existsInQueue := b.batches.get(req.rangeID, req.header)
525531
if !existsInQueue {
526532
ba = b.pool.newBatch(now)
527533
}
@@ -614,27 +620,49 @@ func (b *batch) rangeID() roachpb.RangeID {
614620
return b.reqs[0].rangeID
615621
}
616622

617-
func (b *batch) batchRequest(cfg *Config) *kvpb.BatchRequest {
618-
req := &kvpb.BatchRequest{
619-
// Preallocate the Requests slice.
620-
Requests: make([]kvpb.RequestUnion, 0, len(b.reqs)),
623+
// admissionPriority returns the priority with which to bucket requests with
624+
// the supplied header.
625+
func admissionPriority(header kvpb.AdmissionHeader) int32 {
626+
if header.Source == kvpb.AdmissionHeader_OTHER {
627+
// AdmissionHeader_OTHER bypass admission control, so bucket them separately
628+
// and treat them as the highest priority.
629+
return int32(admissionpb.OneAboveHighPri)
621630
}
631+
return header.Priority
632+
}
633+
634+
func (b *batch) admissionHeader() kvpb.AdmissionHeader {
635+
testingAssert(len(b.reqs) != 0, "admission header should not be called on an empty batch")
622636
var admissionHeader kvpb.AdmissionHeader
623637
for i, r := range b.reqs {
624-
req.Add(r.req)
625638
if i == 0 {
626639
admissionHeader = r.header
627640
} else {
628641
admissionHeader = kv.MergeAdmissionHeaderForBatch(admissionHeader, r.header)
629642
}
630643
}
644+
return admissionHeader
645+
}
646+
647+
func (b *batch) empty() bool {
648+
return len(b.reqs) == 0
649+
}
650+
651+
func (b *batch) batchRequest(cfg *Config) *kvpb.BatchRequest {
652+
req := &kvpb.BatchRequest{
653+
// Preallocate the Requests slice.
654+
Requests: make([]kvpb.RequestUnion, 0, len(b.reqs)),
655+
}
656+
for _, r := range b.reqs {
657+
req.Add(r.req)
658+
}
631659
if cfg.MaxKeysPerBatchReq > 0 {
632660
req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq)
633661
}
634662
if cfg.TargetBytesPerBatchReq > 0 {
635663
req.TargetBytes = cfg.TargetBytesPerBatchReq
636664
}
637-
req.AdmissionHeader = admissionHeader
665+
req.AdmissionHeader = b.admissionHeader()
638666
return req
639667
}
640668

@@ -705,6 +733,23 @@ func (p *pool) putBatch(b *batch) {
705733
p.batchPool.Put(b)
706734
}
707735

736+
// rangePriorityTuple is a container for a RangeID and admission priority pair.
737+
// It's intended to allow the batchQueue to build per-range, per-priority
738+
// batches.
739+
type rangePriorityPair struct {
740+
rangeID roachpb.RangeID
741+
priority int32
742+
}
743+
744+
// makeRangePriorityPair returns a new rangePriorityPair for the supplied
745+
// rangeID and admission header.
746+
func makeRangePriorityPair(rangeID roachpb.RangeID, header kvpb.AdmissionHeader) rangePriorityPair {
747+
return rangePriorityPair{
748+
rangeID: rangeID,
749+
priority: admissionPriority(header),
750+
}
751+
}
752+
708753
// batchQueue is a container for batch objects which offers O(1) get based on
709754
// rangeID and peekFront as well as O(log(n)) upsert, removal, popFront.
710755
// Batch structs are heap ordered inside of the batches slice based on their
@@ -717,14 +762,14 @@ func (p *pool) putBatch(b *batch) {
717762
// per RequestBatcher.
718763
type batchQueue struct {
719764
batches []*batch
720-
byRange map[roachpb.RangeID]*batch
765+
byRange map[rangePriorityPair]*batch
721766
}
722767

723768
var _ heap.Interface = (*batchQueue)(nil)
724769

725770
func makeBatchQueue() batchQueue {
726771
return batchQueue{
727-
byRange: map[roachpb.RangeID]*batch{},
772+
byRange: map[rangePriorityPair]*batch{},
728773
}
729774
}
730775

@@ -742,8 +787,8 @@ func (q *batchQueue) popFront() *batch {
742787
return heap.Pop(q).(*batch)
743788
}
744789

745-
func (q *batchQueue) get(id roachpb.RangeID) (*batch, bool) {
746-
b, exists := q.byRange[id]
790+
func (q *batchQueue) get(id roachpb.RangeID, header kvpb.AdmissionHeader) (*batch, bool) {
791+
b, exists := q.byRange[makeRangePriorityPair(id, header)]
747792
return b, exists
748793
}
749794

@@ -771,24 +816,40 @@ func (q *batchQueue) Swap(i, j int) {
771816

772817
func (q *batchQueue) Less(i, j int) bool {
773818
idl, jdl := q.batches[i].deadline, q.batches[j].deadline
774-
if before := idl.Before(jdl); before || !idl.Equal(jdl) {
775-
return before
819+
if !idl.Equal(jdl) {
820+
return idl.Before(jdl)
821+
}
822+
iPri, jPri := admissionPriority(q.batches[i].admissionHeader()), admissionPriority(q.batches[j].admissionHeader())
823+
if iPri != jPri {
824+
// NB: We've got a min-heap, so we want to prefer higher AC priorities. In
825+
// practice, this doesn't matter, because the batcher sends out all batches
826+
// with the same deadline in parallel. See RequestBatcher.run.
827+
return iPri > jPri
776828
}
829+
// Equal AC priorities; arbitrarily sort by rangeID.
777830
return q.batches[i].rangeID() < q.batches[j].rangeID()
778831
}
779832

780833
func (q *batchQueue) Push(v interface{}) {
781834
ba := v.(*batch)
782835
ba.idx = len(q.batches)
783-
q.byRange[ba.rangeID()] = ba
836+
q.byRange[makeRangePriorityPair(ba.rangeID(), ba.admissionHeader())] = ba
784837
q.batches = append(q.batches, ba)
785838
}
786839

787840
func (q *batchQueue) Pop() interface{} {
788841
ba := q.batches[len(q.batches)-1]
789842
q.batches[len(q.batches)-1] = nil // for GC
790843
q.batches = q.batches[:len(q.batches)-1]
791-
delete(q.byRange, ba.rangeID())
844+
delete(q.byRange, makeRangePriorityPair(ba.rangeID(), ba.admissionHeader()))
792845
ba.idx = -1
793846
return ba
794847
}
848+
849+
// testingAssert panics with the supplied message if the conditional doesn't
850+
// hold.
851+
func testingAssert(cond bool, msg string) {
852+
if buildutil.CrdbTestBuild && !cond {
853+
panic(msg)
854+
}
855+
}

0 commit comments

Comments
 (0)