@@ -660,17 +660,17 @@ func (h baseQueueHelper) MaybeAdd(
660
660
}
661
661
662
662
func (h baseQueueHelper ) Add (
663
- ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback ,
663
+ ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback ,
664
664
) {
665
- _ , err := h .bq .addInternal (ctx , repl .Desc (), repl .ReplicaID (), prio , processCallback )
665
+ _ , err := h .bq .addInternal (ctx , repl .Desc (), repl .ReplicaID (), prio , cb )
666
666
if err != nil && log .V (1 ) {
667
667
log .Infof (ctx , "during Add: %s" , err )
668
668
}
669
669
}
670
670
671
671
type queueHelper interface {
672
672
MaybeAdd (ctx context.Context , repl replicaInQueue , now hlc.ClockTimestamp )
673
- Add (ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback )
673
+ Add (ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback )
674
674
}
675
675
676
676
// baseQueueAsyncRateLimited indicates that the base queue async task was rate
@@ -727,12 +727,12 @@ func (bq *baseQueue) MaybeAddAsync(
727
727
// register a process callback that will be invoked when the replica is enqueued
728
728
// or processed.
729
729
func (bq * baseQueue ) AddAsyncWithCallback (
730
- ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback ,
730
+ ctx context.Context , repl replicaInQueue , prio float64 , cb processCallback ,
731
731
) {
732
732
if err := bq .Async (ctx , "Add" , true /* wait */ , func (ctx context.Context , h queueHelper ) {
733
- h .Add (ctx , repl , prio , processCallback )
733
+ h .Add (ctx , repl , prio , cb )
734
734
}); err != nil {
735
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , err )
735
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , err )
736
736
}
737
737
}
738
738
@@ -838,22 +838,22 @@ func (bq *baseQueue) addInternal(
838
838
desc * roachpb.RangeDescriptor ,
839
839
replicaID roachpb.ReplicaID ,
840
840
priority float64 ,
841
- processCallback processCallback ,
841
+ cb processCallback ,
842
842
) (bool , error ) {
843
843
// NB: this is intentionally outside of bq.mu to avoid having to consider
844
844
// lock ordering constraints.
845
845
if ! desc .IsInitialized () {
846
846
// We checked this above in MaybeAdd(), but we need to check it
847
847
// again for Add().
848
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaNotInitialized )
848
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaNotInitialized )
849
849
return false , errReplicaNotInitialized
850
850
}
851
851
852
852
bq .mu .Lock ()
853
853
defer bq .mu .Unlock ()
854
854
855
855
if bq .mu .stopped {
856
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueStopped )
856
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueStopped )
857
857
return false , errQueueStopped
858
858
}
859
859
@@ -866,14 +866,14 @@ func (bq *baseQueue) addInternal(
866
866
if log .V (3 ) {
867
867
log .Infof (ctx , "queue disabled" )
868
868
}
869
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueDisabled )
869
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errQueueDisabled )
870
870
return false , errQueueDisabled
871
871
}
872
872
}
873
873
874
874
// If the replica is currently in purgatory, don't re-add it.
875
875
if _ , ok := bq .mu .purgatory [desc .RangeID ]; ok {
876
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyInPurgatory )
876
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyInPurgatory )
877
877
return false , nil
878
878
}
879
879
@@ -883,7 +883,7 @@ func (bq *baseQueue) addInternal(
883
883
if item .processing {
884
884
wasRequeued := item .requeue
885
885
item .requeue = true
886
- processCallback .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyProcessing )
886
+ cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errReplicaAlreadyProcessing )
887
887
return ! wasRequeued , nil
888
888
}
889
889
@@ -895,8 +895,8 @@ func (bq *baseQueue) addInternal(
895
895
log .Infof (ctx , "updating priority: %0.3f -> %0.3f" , item .priority , priority )
896
896
}
897
897
bq .mu .priorityQ .update (item , priority )
898
- // item.index should be updated now based on heap property now.
899
- processCallback .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
898
+ // item.index should be updated now based on heap property now.
899
+ cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
900
900
}
901
901
return false , nil
902
902
}
@@ -905,7 +905,7 @@ func (bq *baseQueue) addInternal(
905
905
log .Infof (ctx , "adding: priority=%0.3f" , priority )
906
906
}
907
907
item = & replicaItem {rangeID : desc .RangeID , replicaID : replicaID , priority : priority }
908
- item .registerCallback (processCallback )
908
+ item .registerCallback (cb )
909
909
bq .addLocked (item )
910
910
911
911
// If adding this replica has pushed the queue past its maximum size, remove
@@ -922,8 +922,8 @@ func (bq *baseQueue) addInternal(
922
922
priority , replicaItemToDrop .replicaID )
923
923
// TODO(wenyihu6): when we introduce base queue max size cluster setting,
924
924
// remember to invoke this callback when shrinking the size
925
- for _ , cb := range replicaItemToDrop .callbacks {
926
- cb .onEnqueueResult (- 1 /*indexOnHeap*/ , errDroppedDueToFullQueueSize )
925
+ for _ , callback := range replicaItemToDrop .callbacks {
926
+ callback .onEnqueueResult (- 1 /*indexOnHeap*/ , errDroppedDueToFullQueueSize )
927
927
}
928
928
bq .removeLocked (replicaItemToDrop )
929
929
}
@@ -934,7 +934,7 @@ func (bq *baseQueue) addInternal(
934
934
// No need to signal again.
935
935
}
936
936
// Note: it may already be dropped or dropped afterwards.
937
- processCallback .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
937
+ cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
938
938
return true , nil
939
939
}
940
940
@@ -1384,7 +1384,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
1384
1384
repl replicaInQueue ,
1385
1385
purgErr PurgatoryError ,
1386
1386
priorityAtEnqueue float64 ,
1387
- processCallback []processCallback ,
1387
+ cbs []processCallback ,
1388
1388
) {
1389
1389
bq .mu .AssertHeld ()
1390
1390
@@ -1413,7 +1413,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
1413
1413
replicaID : repl .ReplicaID (),
1414
1414
index : - 1 ,
1415
1415
priority : priorityAtEnqueue ,
1416
- callbacks : processCallback ,
1416
+ callbacks : cbs ,
1417
1417
}
1418
1418
1419
1419
bq .mu .replicas [repl .GetRangeID ()] = item
0 commit comments