@@ -125,44 +125,47 @@ var noopProcessCallback = processCallback{
125
125
// NB: None of the fields below can be nil. Use noopProcessCallback if you do
126
126
// not need to register any callback.
127
127
//
128
- // NB: These callbacks may be called multiple times:
129
- // 1. onEnqueueResult may be called with error = nil first and called again with
130
- // error = errDroppedDueToFullQueueSize when the replicaItem is later dropped
131
- // before processing due to exceeding max queue size.
132
- // 2. onProcessResult may be called with error first and sent to the purgatory
133
- // queue and called again when the puragtory processes the replica.
128
+ // The callback behavior depends on when it's registered. Currently, addInternal
129
+ // and MaybeAddCallback are the only two users. See comments above them for more
130
+ // details on the exact behaviour.
134
131
//
135
- // NB: It is not a strong guarantee that the callback will be executed since
136
- // removeLocked or removeFromReplicaSetLocked may be called without executing
137
- // the callbacks. That happens when the replica is destroyed or recreated with a
138
- // new replica id.
132
+ // NB: Callback execution is not guaranteed since removeLocked or
133
+ // removeFromReplicaSetLocked may be called without executing callbacks. This
134
+ // happens when the replica is destroyed or recreated with a new replica ID.
139
135
//
140
136
// For now, the two use cases (decommissioning nudger and
141
- // maybeBackpressureBatch) are okay with this behaviour. But adding new uses is
142
- // discouraged without cleaning up the contract of processCallback.
143
- // TODO(wenyihu6): consider clean the semantics up after backports
137
+ // maybeBackpressureBatch) are okay with the current behaviour. But adding new
138
+ // uses is discouraged without cleaning up the contract of processCallback.
139
+ // TODO(wenyihu6): consider cleaning up the semantics after backports
144
140
type processCallback struct {
145
- // onProcessResult is called with the result of a process attempt. It is only
146
- // invoked if the base queue gets a chance to process this replica. It may be
147
- // invoked multiple times: first with a processing error and again with
148
- // purgatory processing error.
149
- onProcessResult func (err error )
150
-
151
141
// onEnqueueResult is called with the result of the enqueue attempt. It is
152
142
// invoked when the range is added to the queue and if the range encounters
153
- // any errors before getting a chance to be popped off the queue and getting
154
- // processed.
155
- //
156
- // This may be invoked multiple times: first with error = nil when
157
- // successfully enqueued at the beginning, and again with an error if the
158
- // replica encounters any errors
143
+ // any errors and being enqueued again before being processed.
159
144
//
160
145
// If error is nil, the index on the priority queue where this item sits is
161
146
// also passed in the callback. If error is non-nil, the index passed in the
162
147
// callback is -1. Note: indexOnHeap does not represent the item's exact rank
163
148
// by priority. It only reflects the item's position in the heap array, which
164
149
// gives a rough idea of where it sits in the priority hierarchy.
150
+ //
151
+ // - May be invoked multiple times:
152
+ // 1. Immediately after successful enqueue (err = nil).
153
+ // 2. If the replica is later dropped due to full queue (err =
154
+ // errDroppedDueToFullQueueSize).
155
+ // 3. If re-added with updated priority (err = nil, new heap index).
156
+ // 4. If the replica is already in the queue and processing.
157
+ // - May be skipped if the replica is already in queue and no priority changes
158
+ // occur.
165
159
onEnqueueResult func (indexOnHeap int , err error )
160
+
161
+ // onProcessResult is called with the result of any process attempts. It is
162
+ // only invoked if the base queue gets a chance to process this replica.
163
+ //
164
+ // - May be invoked multiple times if the replica goes through purgatory or
165
+ // re-processing.
166
+ // - May be skipped if the replica is removed with removeFromReplicaSetLocked
167
+ // or registered with a new replica id before processing begins.
168
+ onProcessResult func (err error )
166
169
}
167
170
168
171
// A replicaItem holds a replica and metadata about its queue state and
@@ -720,9 +723,9 @@ func (bq *baseQueue) MaybeAddAsync(
720
723
})
721
724
}
722
725
723
- // MaybeAddAsyncWithCallback is the same as MaybeAddAsync , but allows the caller
724
- // to register a process callback that will be invoked when the replica is
725
- // enqueued or processed.
726
+ // AddAsyncWithCallback is the same as AddAsync , but allows the caller to
727
+ // register a process callback that will be invoked when the replica is enqueued
728
+ // or processed.
726
729
func (bq * baseQueue ) AddAsyncWithCallback (
727
730
ctx context.Context , repl replicaInQueue , prio float64 , processCallback processCallback ,
728
731
) {
@@ -809,6 +812,27 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
809
812
// addInternal adds the replica the queue with specified priority. If
810
813
// the replica is already queued at a lower priority, updates the existing
811
814
// priority. Expects the queue lock to be held by caller.
815
+ //
816
+ // processCallback allows the caller to register a callback that will be invoked
817
+ // when the replica is enqueued or processed.
818
+ // - If the replicaItem has not been added to bq.mu.replicas yet, the callback
819
+ // is registered and onEnqueueResult is invoked immediately with the result of
820
+ // the enqueue attempt. If successfully enqueued, onProcessResult will be
821
+ // invoked when processing completes.
822
+ // - If the replicaItem has already been added to bq.mu.replicas, no new
823
+ // callbacks will be registered. onEnqueueResult registered first time will be
824
+ // invoked with the result of enqueue attempts:
825
+ // 1. Already processing or in purgatory: invoked with
826
+ // errReplicaAlreadyProcessing/errReplicaAlreadyInPurgatory
827
+ // 2. Priority updated: invoked with error = nil and new heap index
828
+ // 3. Waiting in queue without priority change: not invoked
829
+ // 4. Dropped due to full queue: invoked with
830
+ // errDroppedDueToFullQueueSizeonEnqueueResult registered first time is
831
+ // invoked with the result of this enqueue attempt.
832
+ // 5. Other errors: invoked with the error.
833
+ //
834
+ // NB: callback invokation is not guanranteed since removeFromReplicaSetLocked
835
+ // may remove the replica from the queue at any time without invoking them.
812
836
func (bq * baseQueue ) addInternal (
813
837
ctx context.Context ,
814
838
desc * roachpb.RangeDescriptor ,
@@ -922,18 +946,21 @@ func (bq *baseQueue) addInternal(
922
946
// Returns false and no callback is executed.
923
947
//
924
948
// - queued: in mu.replicas and mu.priorityQ
925
- // Returns true and callback is executed when the replica is processed.
949
+ // Returns true. onProcessResult is executed when the replica is processed.
926
950
//
927
951
// - purgatory: in mu.replicas and mu.purgatory
928
- // Returns true and the callback is called immediately with the purgatory error.
929
- // Note that the callback may be invoked again when the purgatory finishes
930
- // processing the replica.
952
+ // Returns true and the onProcessResult is called immediately with the
953
+ // purgatory error. Note that the onProcessResult may be invoked again when
954
+ // the purgatory finishes processing the replica. .
931
955
//
932
956
// - processing: only in mu.replicas and currently being processed
933
- // Returns true and callback is executed when processing completes. If the
934
- // replica is currently being processed by the purgatory queue, it will not
935
- // be in bq.mu.purgatory and the callback will only execute when the purgatory
936
- // finishes processing the replica.
957
+ // Returns true and onProcessResult is executed when processing completes.
958
+ // If the replica is currently being processed by the purgatory queue, it
959
+ // will not be in bq.mu.purgatory and the onProcessResult will only execute
960
+ // when the purgatory finishes processing the replica.
961
+ //
962
+ // If it returns true, onEnqueueResult is invoked on subsequent invocations to
963
+ // addInternal as well.
937
964
//
938
965
// NB: Adding new uses is discouraged without cleaning up the contract of
939
966
// processCallback. For example, removeFromReplicaSetLocked may be called
0 commit comments