Skip to content

Commit 3cd1dfe

Browse files
authored
chore(metric): support busy workers count. (#35)
* chore(metric): support busy workers count. Signed-off-by: Bo-Yi Wu <[email protected]> * chore: update testing Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 3c20ffd commit 3cd1dfe

File tree

7 files changed

+63
-11
lines changed

7 files changed

+63
-11
lines changed

consumer.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,25 @@ var errMaxCapacity = errors.New("max capacity reached")
1515

1616
// Worker for simple queue using channel
1717
type Consumer struct {
18-
taskQueue chan QueuedMessage
19-
runFunc func(context.Context, QueuedMessage) error
20-
stop chan struct{}
21-
logger Logger
22-
stopOnce sync.Once
23-
stopFlag int32
18+
taskQueue chan QueuedMessage
19+
runFunc func(context.Context, QueuedMessage) error
20+
stop chan struct{}
21+
logger Logger
22+
stopOnce sync.Once
23+
stopFlag int32
24+
busyWorkers uint64
25+
}
26+
27+
func (s *Consumer) incBusyWorker() {
28+
atomic.AddUint64(&s.busyWorkers, 1)
29+
}
30+
31+
func (s *Consumer) decBusyWorker() {
32+
atomic.AddUint64(&s.busyWorkers, ^uint64(0))
33+
}
34+
35+
func (s *Consumer) BusyWorkers() uint64 {
36+
return atomic.LoadUint64(&s.busyWorkers)
2437
}
2538

2639
// BeforeRun run script before start worker
@@ -39,7 +52,11 @@ func (s *Consumer) handle(job Job) error {
3952
panicChan := make(chan interface{}, 1)
4053
startTime := time.Now()
4154
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
42-
defer cancel()
55+
s.incBusyWorker()
56+
defer func() {
57+
cancel()
58+
s.decBusyWorker()
59+
}()
4360

4461
// run the job
4562
go func() {

consumer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,3 +357,30 @@ func TestTaskJobComplete(t *testing.T) {
357357
}
358358
assert.Equal(t, context.DeadlineExceeded, w.handle(job))
359359
}
360+
361+
func TestBusyWorkerCount(t *testing.T) {
362+
job := Job{
363+
Timeout: 200 * time.Millisecond,
364+
Task: func(ctx context.Context) error {
365+
time.Sleep(100 * time.Millisecond)
366+
return nil
367+
},
368+
}
369+
370+
w := NewConsumer()
371+
372+
assert.Equal(t, uint64(0), w.BusyWorkers())
373+
go func() {
374+
assert.NoError(t, w.handle(job))
375+
}()
376+
go func() {
377+
assert.NoError(t, w.handle(job))
378+
}()
379+
380+
time.Sleep(50 * time.Millisecond)
381+
assert.Equal(t, uint64(2), w.BusyWorkers())
382+
time.Sleep(100 * time.Millisecond)
383+
assert.Equal(t, uint64(0), w.BusyWorkers())
384+
385+
assert.NoError(t, w.Shutdown())
386+
}

queue_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func TestNewQueue(t *testing.T) {
2929
assert.NotNil(t, q)
3030

3131
q.Start()
32+
assert.Equal(t, uint64(0), w.BusyWorkers())
3233
q.Shutdown()
3334
q.Wait()
3435
}
@@ -48,6 +49,7 @@ func TestWorkerNum(t *testing.T) {
4849
q.Start()
4950
time.Sleep(20 * time.Millisecond)
5051
assert.Equal(t, 4, q.Workers())
52+
assert.Equal(t, uint64(0), w.BusyWorkers())
5153
q.Shutdown()
5254
q.Wait()
5355
}
@@ -203,6 +205,7 @@ func TestQueueTaskJob(t *testing.T) {
203205
return nil
204206
}))
205207
time.Sleep(50 * time.Millisecond)
208+
assert.Equal(t, uint64(0), w.BusyWorkers())
206209
q.Shutdown()
207210
assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error {
208211
return nil

worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Worker interface {
1616
Capacity() int
1717
// Usage is how many message in queue
1818
Usage() int
19+
// BusyWorkers return count of busy worker currently
20+
BusyWorkers() uint64
1921
}
2022

2123
// QueuedMessage ...

worker_empty.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ func (w *emptyWorker) Shutdown() error { return nil }
1212
func (w *emptyWorker) Queue(job QueuedMessage) error { return nil }
1313
func (w *emptyWorker) Capacity() int { return 0 }
1414
func (w *emptyWorker) Usage() int { return 0 }
15+
func (w *emptyWorker) BusyWorkers() uint64 { return uint64(0) }

worker_message.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ func (w *messageWorker) Queue(job QueuedMessage) error {
3737
return errors.New("max capacity reached")
3838
}
3939
}
40-
func (w *messageWorker) Capacity() int { return cap(w.messages) }
41-
func (w *messageWorker) Usage() int { return len(w.messages) }
40+
func (w *messageWorker) Capacity() int { return cap(w.messages) }
41+
func (w *messageWorker) Usage() int { return len(w.messages) }
42+
func (w *messageWorker) BusyWorkers() uint64 { return uint64(0) }

worker_task.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ func (w *taskWorker) Queue(job QueuedMessage) error {
3838
return errors.New("max capacity reached")
3939
}
4040
}
41-
func (w *taskWorker) Capacity() int { return cap(w.messages) }
42-
func (w *taskWorker) Usage() int { return len(w.messages) }
41+
func (w *taskWorker) Capacity() int { return cap(w.messages) }
42+
func (w *taskWorker) Usage() int { return len(w.messages) }
43+
func (w *taskWorker) BusyWorkers() uint64 { return uint64(0) }

0 commit comments

Comments
 (0)