Skip to content

Commit f4bc970

Browse files
authored
feat(worker): update node count dynamically (#52)
1 parent 94512c6 commit f4bc970

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

consumer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,3 +321,66 @@ func TestTaskJobComplete(t *testing.T) {
321321
}
322322
assert.Equal(t, context.DeadlineExceeded, w.handle(job))
323323
}
324+
325+
func TestIncreaseWorkerCount(t *testing.T) {
326+
w := NewConsumer(
327+
WithLogger(NewEmptyLogger()),
328+
WithFn(func(ctx context.Context, m QueuedMessage) error {
329+
time.Sleep(500 * time.Millisecond)
330+
return nil
331+
}),
332+
)
333+
q, err := NewQueue(
334+
WithLogger(NewLogger()),
335+
WithWorker(w),
336+
WithWorkerCount(5),
337+
)
338+
assert.NoError(t, err)
339+
340+
for i := 1; i <= 10; i++ {
341+
m := mockMessage{
342+
message: fmt.Sprintf("new message: %d", i),
343+
}
344+
assert.NoError(t, q.Queue(m))
345+
}
346+
347+
q.Start()
348+
time.Sleep(100 * time.Millisecond)
349+
assert.Equal(t, 5, q.BusyWorkers())
350+
q.UpdateWorkerCount(10)
351+
time.Sleep(100 * time.Millisecond)
352+
assert.Equal(t, 10, q.BusyWorkers())
353+
q.Release()
354+
}
355+
356+
func TestDecreaseWorkerCount(t *testing.T) {
357+
w := NewConsumer(
358+
WithFn(func(ctx context.Context, m QueuedMessage) error {
359+
time.Sleep(100 * time.Millisecond)
360+
return nil
361+
}),
362+
)
363+
q, err := NewQueue(
364+
WithLogger(NewLogger()),
365+
WithWorker(w),
366+
WithWorkerCount(5),
367+
)
368+
assert.NoError(t, err)
369+
370+
for i := 1; i <= 10; i++ {
371+
m := mockMessage{
372+
message: fmt.Sprintf("test message: %d", i),
373+
}
374+
assert.NoError(t, q.Queue(m))
375+
}
376+
377+
q.Start()
378+
time.Sleep(20 * time.Millisecond)
379+
assert.Equal(t, 5, q.BusyWorkers())
380+
q.UpdateWorkerCount(3)
381+
time.Sleep(100 * time.Millisecond)
382+
assert.Equal(t, 3, q.BusyWorkers())
383+
time.Sleep(100 * time.Millisecond)
384+
assert.Equal(t, 2, q.BusyWorkers())
385+
q.Release()
386+
}

queue.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,18 @@ func (q *Queue) work(task QueuedMessage) {
220220
}
221221
}
222222

223+
func (q *Queue) UpdateWorkerCount(num int) {
224+
q.workerCount = num
225+
q.schedule()
226+
}
227+
223228
func (q *Queue) schedule() {
229+
q.Lock()
230+
defer q.Unlock()
231+
if q.BusyWorkers() >= q.workerCount {
232+
return
233+
}
234+
224235
select {
225236
case q.ready <- struct{}{}:
226237
default:
@@ -280,11 +291,7 @@ func (q *Queue) start() {
280291
}
281292

282293
// check worker number
283-
q.Lock()
284-
if q.BusyWorkers() < q.workerCount {
285-
q.schedule()
286-
}
287-
q.Unlock()
294+
q.schedule()
288295

289296
// get worker to execute new task
290297
select {

0 commit comments

Comments
 (0)