Skip to content

Commit a6626fa

Browse files
committed
Only poll work when there are slots to work on
1 parent 33a428e commit a6626fa

File tree

4 files changed

+1259
-20
lines changed

4 files changed

+1259
-20
lines changed

internal/worker/worker.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Worker[Task, TaskResult any] struct {
2727

2828
tw TaskWorker[Task, TaskResult]
2929

30-
taskQueue chan *Task
30+
taskQueue *workQueue[Task]
3131

3232
logger *slog.Logger
3333

@@ -64,7 +64,7 @@ func NewWorker[Task, TaskResult any](
6464
return &Worker[Task, TaskResult]{
6565
tw: tw,
6666
options: options,
67-
taskQueue: make(chan *Task),
67+
taskQueue: newWorkQueue[Task](options.MaxParallelTasks),
6868
logger: b.Options().Logger,
6969
dispatcherDone: make(chan struct{}, 1),
7070
}
@@ -91,7 +91,7 @@ func (w *Worker[Task, TaskResult]) WaitForCompletion() error {
9191
w.pollersWg.Wait()
9292

9393
// Wait for tasks to finish
94-
close(w.taskQueue)
94+
close(w.taskQueue.tasks)
9595
<-w.dispatcherDone
9696

9797
return nil
@@ -114,16 +114,32 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
114114
default:
115115
}
116116

117+
// Reserve slot for work we might get. This blocks if there are no slots available.
118+
if err := w.taskQueue.reserve(ctx); err != nil {
119+
if errors.Is(err, context.Canceled) {
120+
return
121+
}
122+
}
123+
117124
task, err := w.poll(ctx, 30*time.Second)
118125
if err != nil {
119126
if !errors.Is(err, context.Canceled) {
120127
w.logger.ErrorContext(ctx, "error polling task", "error", err)
121128
}
122129
} else if task != nil {
123-
w.taskQueue <- task
130+
if err := w.taskQueue.add(ctx, task); err != nil {
131+
if !errors.Is(err, context.Canceled) {
132+
w.logger.ErrorContext(ctx, "error adding task to queue", "error", err)
133+
w.taskQueue.release()
134+
}
135+
}
124136
continue // check for new tasks right away
137+
} else {
138+
// Did not use the reserved slot, release
139+
w.taskQueue.release()
125140
}
126141

142+
// Optionally wait between unsuccessful polling attempts
127143
if w.options.PollingInterval > 0 {
128144
select {
129145
case <-ticker.C:
@@ -135,40 +151,28 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
135151
}
136152

137153
func (w *Worker[Task, TaskResult]) dispatcher() {
138-
var sem chan struct{}
139-
140-
if w.options.MaxParallelTasks > 0 {
141-
sem = make(chan struct{}, w.options.MaxParallelTasks)
142-
}
143-
144154
var wg sync.WaitGroup
145155

146-
for t := range w.taskQueue {
147-
// If limited max tasks, wait for a slot to open up
148-
if sem != nil {
149-
sem <- struct{}{}
150-
}
151-
156+
for t := range w.taskQueue.tasks {
152157
wg.Add(1)
153158

154159
t := t
155160
go func() {
161+
defer w.taskQueue.release()
156162
defer wg.Done()
157163

158164
// Create new context to allow tasks to complete when root context is canceled
159165
taskCtx := context.Background()
160166
if err := w.handle(taskCtx, t); err != nil {
161167
w.logger.ErrorContext(taskCtx, "error handling task", "error", err)
162168
}
163-
164-
if sem != nil {
165-
<-sem
166-
}
167169
}()
168170
}
169171

172+
// Wait for all pending tasks to finish
170173
wg.Wait()
171174

175+
// Then notify anyone waiting for this that the dispatcher is done.
172176
w.dispatcherDone <- struct{}{}
173177
}
174178

0 commit comments

Comments
 (0)