Skip to content

Commit a982c04

Browse files
committed
fix leaked goroutine
1 parent f065c10 commit a982c04

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func (bw *baseWorker) runPoller() {
252252
select {
253253
case <-bw.shutdownCh:
254254
return
255-
case <-bw.dynamic.TaskPermit.AcquireChan(bw.limiterContext): // don't poll unless there is a task permit
255+
case <-bw.dynamic.TaskPermit.AcquireChan(bw.limiterContext, &bw.shutdownWG): // don't poll unless there is a task permit
256256
// TODO move to a centralized place inside the worker
257257
// emit metrics on concurrent task permit quota and current task permit count
258258
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process

internal/worker/dynamic_params.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package worker
2323
import (
2424
"context"
2525
"fmt"
26+
"sync"
2627

2728
"github.com/marusama/semaphore/v2"
2829
)
@@ -38,7 +39,7 @@ type DynamicParams struct {
3839
// Permit is an adaptive
3940
type Permit interface {
4041
Acquire(context.Context, int) error
41-
AcquireChan(context.Context) <-chan struct{}
42+
AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}
4243
Quota() int
4344
SetQuota(int)
4445
Count() int
@@ -63,15 +64,19 @@ func (p *permit) Acquire(ctx context.Context, count int) error {
6364
}
6465

6566
// AcquireChan returns a permit ready channel. It's closed then permit is acquired
66-
func (p *permit) AcquireChan(ctx context.Context) <-chan struct{} {
67+
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
6768
ch := make(chan struct{})
69+
wg.Add(1)
6870
go func() {
71+
defer wg.Done()
6972
if err := p.sem.Acquire(ctx, 1); err != nil {
7073
close(ch)
7174
return
7275
}
73-
ch <- struct{}{}
74-
close(ch)
76+
select { // try to send to channel, but don't block if listener is gone
77+
case ch <- struct{}{}:
78+
default:
79+
}
7580
}()
7681
return ch
7782
}

0 commit comments

Comments
 (0)