Skip to content

Commit 385935a

Browse files
committed
propagate context
1 parent 6c9af52 commit 385935a

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func (bw *baseWorker) runPoller() {
252252
select {
253253
case <-bw.shutdownCh:
254254
return
255-
case <-bw.dynamic.TaskPermit.AcquireChan(): // don't poll unless there is a task permit
255+
case <-bw.dynamic.TaskPermit.AcquireChan(bw.limiterContext): // don't poll unless there is a task permit
256256
// TODO move to a centralized place inside the worker
257257
// emit metrics on concurrent task permit quota and current task permit count
258258
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process

internal/worker/dynamic_params.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type DynamicParams struct {
3838
// Permit is an adaptive
3939
type Permit interface {
4040
Acquire(context.Context, int) error
41-
AcquireChan() <-chan struct{}
41+
AcquireChan(context.Context) <-chan struct{}
4242
Quota() int
4343
SetQuota(int)
4444
Count() int
@@ -63,10 +63,10 @@ func (p *permit) Acquire(ctx context.Context, count int) error {
6363
}
6464

6565
// AcquireChan returns a permit ready channel. It's closed then permit is acquired
66-
func (p *permit) AcquireChan() <-chan struct{} {
66+
func (p *permit) AcquireChan(ctx context.Context) <-chan struct{} {
6767
ch := make(chan struct{})
6868
go func() {
69-
if err := p.sem.Acquire(nil, 1); err != nil { // nil context indicates no need to exit on context done
69+
if err := p.sem.Acquire(ctx, 1); err != nil {
7070
close(ch)
7171
return
7272
}

0 commit comments

Comments
 (0)