12
12
// this library.
13
13
//
14
14
// Batching assumes that data with the same key can be sent in a single batch.
15
- // The initial implementation uses rangeID as the key explicitly to avoid
16
- // creating an overly general solution without motivation but interested readers
17
- // should recognize that it would be easy to extend this package to accept an
15
+ // The implementation here uses rangeID and admission priority to construct
16
+ // batches. This may be extended, or generalized, in the future by accepting a
18
17
// arbitrary comparable key.
19
18
package requestbatcher
20
19
@@ -27,6 +26,8 @@ import (
27
26
"github.com/cockroachdb/cockroach/pkg/kv"
28
27
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
29
28
"github.com/cockroachdb/cockroach/pkg/roachpb"
29
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
30
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
30
31
"github.com/cockroachdb/cockroach/pkg/util/log"
31
32
"github.com/cockroachdb/cockroach/pkg/util/stop"
32
33
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -162,8 +163,8 @@ type Config struct {
162
163
// RequestBatcher -- a BatchRequest on that range will timeout, which will
163
164
// timeout and fail the entire batch.
164
165
//
165
- // TODO(sumeer): once intent resolution is subject to admission control, we
166
- // could have timeouts even though a range is available. Is that desirable?
166
+ // TODO(sumeer): we could have timeouts even though a range is available. Is
167
+ // that desirable?
167
168
MaxTimeout time.Duration
168
169
169
170
// InFlightBackpressureLimit is the number of batches in flight above which
@@ -437,6 +438,11 @@ func (b *RequestBatcher) sendResponse(req *request, resp Response) {
437
438
}
438
439
439
440
func addRequestToBatch (cfg * Config , now time.Time , ba * batch , r * request ) (shouldSend bool ) {
441
+ testingAssert (ba .empty () || admissionPriority (ba .admissionHeader ()) == admissionPriority (r .header ),
442
+ "requests with different admission headers shouldn't be added to the same batch" )
443
+ testingAssert (ba .empty () || ba .rangeID () == r .rangeID ,
444
+ "requests with different range IDs shouldn't be added to the same batch" )
445
+
440
446
// Update the deadline for the batch if this requests's deadline is later
441
447
// than the current latest.
442
448
rDeadline , rHasDeadline := r .ctx .Deadline ()
@@ -521,7 +527,7 @@ func (b *RequestBatcher) run(ctx context.Context) {
521
527
}
522
528
handleRequest = func (req * request ) {
523
529
now := b .now ()
524
- ba , existsInQueue := b .batches .get (req .rangeID )
530
+ ba , existsInQueue := b .batches .get (req .rangeID , req . header )
525
531
if ! existsInQueue {
526
532
ba = b .pool .newBatch (now )
527
533
}
@@ -614,27 +620,49 @@ func (b *batch) rangeID() roachpb.RangeID {
614
620
return b .reqs [0 ].rangeID
615
621
}
616
622
617
- func (b * batch ) batchRequest (cfg * Config ) * kvpb.BatchRequest {
618
- req := & kvpb.BatchRequest {
619
- // Preallocate the Requests slice.
620
- Requests : make ([]kvpb.RequestUnion , 0 , len (b .reqs )),
623
+ // admissionPriority returns the priority with which to bucket requests with
624
+ // the supplied header.
625
+ func admissionPriority (header kvpb.AdmissionHeader ) int32 {
626
+ if header .Source == kvpb .AdmissionHeader_OTHER {
627
+ // AdmissionHeader_OTHER bypass admission control, so bucket them separately
628
+ // and treat them as the highest priority.
629
+ return int32 (admissionpb .OneAboveHighPri )
621
630
}
631
+ return header .Priority
632
+ }
633
+
634
+ func (b * batch ) admissionHeader () kvpb.AdmissionHeader {
635
+ testingAssert (len (b .reqs ) != 0 , "admission header should not be called on an empty batch" )
622
636
var admissionHeader kvpb.AdmissionHeader
623
637
for i , r := range b .reqs {
624
- req .Add (r .req )
625
638
if i == 0 {
626
639
admissionHeader = r .header
627
640
} else {
628
641
admissionHeader = kv .MergeAdmissionHeaderForBatch (admissionHeader , r .header )
629
642
}
630
643
}
644
+ return admissionHeader
645
+ }
646
+
647
+ func (b * batch ) empty () bool {
648
+ return len (b .reqs ) == 0
649
+ }
650
+
651
+ func (b * batch ) batchRequest (cfg * Config ) * kvpb.BatchRequest {
652
+ req := & kvpb.BatchRequest {
653
+ // Preallocate the Requests slice.
654
+ Requests : make ([]kvpb.RequestUnion , 0 , len (b .reqs )),
655
+ }
656
+ for _ , r := range b .reqs {
657
+ req .Add (r .req )
658
+ }
631
659
if cfg .MaxKeysPerBatchReq > 0 {
632
660
req .MaxSpanRequestKeys = int64 (cfg .MaxKeysPerBatchReq )
633
661
}
634
662
if cfg .TargetBytesPerBatchReq > 0 {
635
663
req .TargetBytes = cfg .TargetBytesPerBatchReq
636
664
}
637
- req .AdmissionHeader = admissionHeader
665
+ req .AdmissionHeader = b . admissionHeader ()
638
666
return req
639
667
}
640
668
@@ -705,6 +733,23 @@ func (p *pool) putBatch(b *batch) {
705
733
p .batchPool .Put (b )
706
734
}
707
735
736
+ // rangePriorityTuple is a container for a RangeID and admission priority pair.
737
+ // It's intended to allow the batchQueue to build per-range, per-priority
738
+ // batches.
739
+ type rangePriorityPair struct {
740
+ rangeID roachpb.RangeID
741
+ priority int32
742
+ }
743
+
744
+ // makeRangePriorityPair returns a new rangePriorityPair for the supplied
745
+ // rangeID and admission header.
746
+ func makeRangePriorityPair (rangeID roachpb.RangeID , header kvpb.AdmissionHeader ) rangePriorityPair {
747
+ return rangePriorityPair {
748
+ rangeID : rangeID ,
749
+ priority : admissionPriority (header ),
750
+ }
751
+ }
752
+
708
753
// batchQueue is a container for batch objects which offers O(1) get based on
709
754
// rangeID and peekFront as well as O(log(n)) upsert, removal, popFront.
710
755
// Batch structs are heap ordered inside of the batches slice based on their
@@ -717,14 +762,14 @@ func (p *pool) putBatch(b *batch) {
717
762
// per RequestBatcher.
718
763
type batchQueue struct {
719
764
batches []* batch
720
- byRange map [roachpb. RangeID ]* batch
765
+ byRange map [rangePriorityPair ]* batch
721
766
}
722
767
723
768
var _ heap.Interface = (* batchQueue )(nil )
724
769
725
770
func makeBatchQueue () batchQueue {
726
771
return batchQueue {
727
- byRange : map [roachpb. RangeID ]* batch {},
772
+ byRange : map [rangePriorityPair ]* batch {},
728
773
}
729
774
}
730
775
@@ -742,8 +787,8 @@ func (q *batchQueue) popFront() *batch {
742
787
return heap .Pop (q ).(* batch )
743
788
}
744
789
745
- func (q * batchQueue ) get (id roachpb.RangeID ) (* batch , bool ) {
746
- b , exists := q .byRange [id ]
790
+ func (q * batchQueue ) get (id roachpb.RangeID , header kvpb. AdmissionHeader ) (* batch , bool ) {
791
+ b , exists := q .byRange [makeRangePriorityPair ( id , header ) ]
747
792
return b , exists
748
793
}
749
794
@@ -771,24 +816,40 @@ func (q *batchQueue) Swap(i, j int) {
771
816
772
817
func (q * batchQueue ) Less (i , j int ) bool {
773
818
idl , jdl := q .batches [i ].deadline , q .batches [j ].deadline
774
- if before := idl .Before (jdl ); before || ! idl .Equal (jdl ) {
775
- return before
819
+ if ! idl .Equal (jdl ) {
820
+ return idl .Before (jdl )
821
+ }
822
+ iPri , jPri := admissionPriority (q .batches [i ].admissionHeader ()), admissionPriority (q .batches [j ].admissionHeader ())
823
+ if iPri != jPri {
824
+ // NB: We've got a min-heap, so we want to prefer higher AC priorities. In
825
+ // practice, this doesn't matter, because the batcher sends out all batches
826
+ // with the same deadline in parallel. See RequestBatcher.run.
827
+ return iPri > jPri
776
828
}
829
+ // Equal AC priorities; arbitrarily sort by rangeID.
777
830
return q .batches [i ].rangeID () < q .batches [j ].rangeID ()
778
831
}
779
832
780
833
func (q * batchQueue ) Push (v interface {}) {
781
834
ba := v .(* batch )
782
835
ba .idx = len (q .batches )
783
- q .byRange [ba .rangeID ()] = ba
836
+ q .byRange [makeRangePriorityPair ( ba .rangeID (), ba . admissionHeader () )] = ba
784
837
q .batches = append (q .batches , ba )
785
838
}
786
839
787
840
func (q * batchQueue ) Pop () interface {} {
788
841
ba := q .batches [len (q .batches )- 1 ]
789
842
q .batches [len (q .batches )- 1 ] = nil // for GC
790
843
q .batches = q .batches [:len (q .batches )- 1 ]
791
- delete (q .byRange , ba .rangeID ())
844
+ delete (q .byRange , makeRangePriorityPair ( ba .rangeID (), ba . admissionHeader () ))
792
845
ba .idx = - 1
793
846
return ba
794
847
}
848
+
849
+ // testingAssert panics with the supplied message if the conditional doesn't
850
+ // hold.
851
+ func testingAssert (cond bool , msg string ) {
852
+ if buildutil .CrdbTestBuild && ! cond {
853
+ panic (msg )
854
+ }
855
+ }
0 commit comments