@@ -635,17 +635,17 @@ func (h baseQueueHelper) MaybeAdd(
635
635
}
636
636
637
637
func (h baseQueueHelper ) Add (
638
- ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback ,
638
+ ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback ,
639
639
) {
640
- _ , err := h .bq .addInternal (ctx , repl .Desc (), repl .ReplicaID (), prio , processCallback )
640
+ _ , err := h .bq .addInternal (ctx , repl .Desc (), repl .ReplicaID (), prio , cb )
641
641
if err != nil && log .V (1 ) {
642
642
log .Dev .Infof (ctx , "during Add: %s" , err )
643
643
}
644
644
}
645
645
646
646
type queueHelper interface {
647
647
MaybeAdd (ctx context.Context , repl replicaInQueue , now hlc.ClockTimestamp )
648
- Add (ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback )
648
+ Add (ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback )
649
649
}
650
650
651
651
// baseQueueAsyncRateLimited indicates that the base queue async task was rate
@@ -703,12 +703,12 @@ func (bq *baseQueue) MaybeAddAsync(
703
703
// register a process callback that will be invoked when the replica is enqueued
704
704
// or processed.
705
705
func (bq * baseQueue ) AddAsyncWithCallback (
706
- ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback ,
706
+ ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback ,
707
707
) {
708
708
if err := bq .Async (ctx , "Add" , true /* wait */ , func (ctx context.Context , h queueHelper ) {
709
- h .Add (ctx , repl , prio , processCallback )
709
+ h .Add (ctx , repl , prio , cb )
710
710
}); err != nil {
711
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , err )
711
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , err )
712
712
}
713
713
}
714
714
@@ -814,22 +814,22 @@ func (bq *baseQueue) addInternal(
814
814
desc * roachpb.RangeDescriptor ,
815
815
replicaID roachpb.ReplicaID ,
816
816
priority float64 ,
817
- processCallback processCallback ,
817
+ cb processCallback ,
818
818
) (bool , error ) {
819
819
// NB: this is intentionally outside of bq.mu to avoid having to consider
820
820
// lock ordering constraints.
821
821
if ! desc .IsInitialized () {
822
822
// We checked this above in MaybeAdd(), but we need to check it
823
823
// again for Add().
824
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaNotInitialized )
824
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaNotInitialized )
825
825
return false , errReplicaNotInitialized
826
826
}
827
827
828
828
bq .mu .Lock ()
829
829
defer bq .mu .Unlock ()
830
830
831
831
if bq .mu .stopped {
832
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueStopped )
832
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueStopped )
833
833
return false , errQueueStopped
834
834
}
835
835
@@ -842,14 +842,14 @@ func (bq *baseQueue) addInternal(
842
842
if log .V (3 ) {
843
843
log .Dev .Infof (ctx , "queue disabled" )
844
844
}
845
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueDisabled )
845
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueDisabled )
846
846
return false , errQueueDisabled
847
847
}
848
848
}
849
849
850
850
// If the replica is currently in purgatory, don't re-add it.
851
851
if _ , ok := bq .mu .purgatory [desc .RangeID ]; ok {
852
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyInPurgatory )
852
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyInPurgatory )
853
853
return false , nil
854
854
}
855
855
@@ -859,7 +859,7 @@ func (bq *baseQueue) addInternal(
859
859
if item .processing {
860
860
wasRequeued := item .requeue
861
861
item .requeue = true
862
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyProcessing )
862
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyProcessing )
863
863
return ! wasRequeued , nil
864
864
}
865
865
@@ -871,8 +871,8 @@ func (bq *baseQueue) addInternal(
871
871
log .Dev .Infof (ctx , "updating priority: %0.3f -> %0.3f" , item .priority , priority )
872
872
}
873
873
bq .mu .priorityQ .update (item , priority )
874
- // item.index should be updated now based on heap property now.
875
- processCallback .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
874
+ // item.index should be updated now based on heap property now.
875
+ cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
876
876
}
877
877
return false , nil
878
878
}
@@ -881,7 +881,7 @@ func (bq *baseQueue) addInternal(
881
881
log .Dev .Infof (ctx , "adding: priority=%0.3f" , priority )
882
882
}
883
883
item = & replicaItem {rangeID : desc .RangeID , replicaID : replicaID , priority : priority }
884
- item .registerCallback (processCallback )
884
+ item .registerCallback (cb )
885
885
bq .addLocked (item )
886
886
887
887
// If adding this replica has pushed the queue past its maximum size, remove
@@ -895,8 +895,8 @@ func (bq *baseQueue) addInternal(
895
895
priority , replicaItemToDrop .replicaID )
896
896
// TODO(wenyihu6): when we introduce base queue max size cluster setting,
897
897
// remember to invoke this callback when shrinking the size
898
- for _ , cb := range replicaItemToDrop .callbacks {
899
- cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errDroppedDueToFullQueueSize )
898
+ for _ , callback := range replicaItemToDrop .callbacks {
899
+ callback .onEnqueueResult (- 1 /*indexOnHeap*/ , errDroppedDueToFullQueueSize )
900
900
}
901
901
bq .removeLocked (replicaItemToDrop )
902
902
}
@@ -907,7 +907,7 @@ func (bq *baseQueue) addInternal(
907
907
// No need to signal again.
908
908
}
909
909
// Note: it may already be dropped or dropped afterwards.
910
- processCallback .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
910
+ cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
911
911
return true , nil
912
912
}
913
913
@@ -1357,7 +1357,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
1357
1357
repl replicaInQueue ,
1358
1358
purgErr PurgatoryError ,
1359
1359
priorityAtEnqueue float64 ,
1360
- processCallback []processCallback ,
1360
+ cbs []processCallback ,
1361
1361
) {
1362
1362
bq .mu .AssertHeld ()
1363
1363
@@ -1386,7 +1386,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
1386
1386
replicaID : repl .ReplicaID (),
1387
1387
index : - 1 ,
1388
1388
priority : priorityAtEnqueue ,
1389
- callbacks : processCallback ,
1389
+ callbacks : cbs ,
1390
1390
}
1391
1391
1392
1392
bq .mu .replicas [repl .GetRangeID ()] = item
0 commit comments