Skip to content

Commit 32f7bad

Browse files
committed
queue: adopt stopper handle
1 parent bfda6a4 commit 32f7bad

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -598,18 +598,22 @@ func (bq *baseQueue) Async(
598598
log.InfofDepth(ctx, 2, "%s", redact.Safe(opName))
599599
}
600600
opName += " (" + bq.name + ")"
601-
bgCtx := bq.AnnotateCtx(context.Background())
602-
if err := bq.store.stopper.RunAsyncTaskEx(bgCtx,
603-
stop.TaskOpts{
601+
bgCtx, hdl, err := bq.store.stopper.GetHandle(
602+
bq.AnnotateCtx(context.Background()), stop.TaskOpts{
604603
TaskName: opName,
605604
Sem: bq.addOrMaybeAddSem,
606605
WaitForSem: wait,
607-
},
608-
func(ctx context.Context) {
609-
fn(ctx, baseQueueHelper{bq})
610-
}); err != nil && bq.addLogN.ShouldLog() {
611-
log.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err)
606+
})
607+
if err != nil {
608+
if bq.addLogN.ShouldLog() {
609+
log.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err)
610+
}
611+
return
612612
}
613+
go func(ctx context.Context) {
614+
defer hdl.Activate(ctx).Release(ctx)
615+
fn(ctx, baseQueueHelper{bq})
616+
}(bgCtx)
613617
}
614618

615619
// MaybeAddAsync offers the replica to the queue. The queue will only process a

0 commit comments

Comments
 (0)