Skip to content

Commit 9793749

Browse files
authored
Merge pull request #393 from cschleiden/limited-worker-task-queue
Only poll work when there are slots to work on
2 parents 33a428e + 1e8fc85 commit 9793749

File tree

4 files changed

+1259
-20
lines changed

4 files changed

+1259
-20
lines changed

internal/worker/worker.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Worker[Task, TaskResult any] struct {
2727

2828
tw TaskWorker[Task, TaskResult]
2929

30-
taskQueue chan *Task
30+
taskQueue *workQueue[Task]
3131

3232
logger *slog.Logger
3333

@@ -64,7 +64,7 @@ func NewWorker[Task, TaskResult any](
6464
return &Worker[Task, TaskResult]{
6565
tw: tw,
6666
options: options,
67-
taskQueue: make(chan *Task),
67+
taskQueue: newWorkQueue[Task](options.MaxParallelTasks),
6868
logger: b.Options().Logger,
6969
dispatcherDone: make(chan struct{}, 1),
7070
}
@@ -91,7 +91,7 @@ func (w *Worker[Task, TaskResult]) WaitForCompletion() error {
9191
w.pollersWg.Wait()
9292

9393
// Wait for tasks to finish
94-
close(w.taskQueue)
94+
close(w.taskQueue.tasks)
9595
<-w.dispatcherDone
9696

9797
return nil
@@ -114,16 +114,33 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
114114
default:
115115
}
116116

117+
// Reserve slot for work we might get. This blocks if there are no slots available.
118+
if err := w.taskQueue.reserve(ctx); err != nil {
119+
if errors.Is(err, context.Canceled) {
120+
return
121+
}
122+
}
123+
117124
task, err := w.poll(ctx, 30*time.Second)
118125
if err != nil {
119126
if !errors.Is(err, context.Canceled) {
120127
w.logger.ErrorContext(ctx, "error polling task", "error", err)
128+
w.taskQueue.release()
121129
}
122130
} else if task != nil {
123-
w.taskQueue <- task
131+
if err := w.taskQueue.add(ctx, task); err != nil {
132+
if !errors.Is(err, context.Canceled) {
133+
w.logger.ErrorContext(ctx, "error adding task to queue", "error", err)
134+
w.taskQueue.release()
135+
}
136+
}
124137
continue // check for new tasks right away
138+
} else {
139+
// Did not use the reserved slot, release
140+
w.taskQueue.release()
125141
}
126142

143+
// Optionally wait between unsuccessful polling attempts
127144
if w.options.PollingInterval > 0 {
128145
select {
129146
case <-ticker.C:
@@ -135,40 +152,28 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
135152
}
136153

137154
func (w *Worker[Task, TaskResult]) dispatcher() {
138-
var sem chan struct{}
139-
140-
if w.options.MaxParallelTasks > 0 {
141-
sem = make(chan struct{}, w.options.MaxParallelTasks)
142-
}
143-
144155
var wg sync.WaitGroup
145156

146-
for t := range w.taskQueue {
147-
// If limited max tasks, wait for a slot to open up
148-
if sem != nil {
149-
sem <- struct{}{}
150-
}
151-
157+
for t := range w.taskQueue.tasks {
152158
wg.Add(1)
153159

154160
t := t
155161
go func() {
162+
defer w.taskQueue.release()
156163
defer wg.Done()
157164

158165
// Create new context to allow tasks to complete when root context is canceled
159166
taskCtx := context.Background()
160167
if err := w.handle(taskCtx, t); err != nil {
161168
w.logger.ErrorContext(taskCtx, "error handling task", "error", err)
162169
}
163-
164-
if sem != nil {
165-
<-sem
166-
}
167170
}()
168171
}
169172

173+
// Wait for all pending tasks to finish
170174
wg.Wait()
171175

176+
// Then notify anyone waiting for this that the dispatcher is done.
172177
w.dispatcherDone <- struct{}{}
173178
}
174179

0 commit comments

Comments
 (0)