Skip to content

Commit 278ee81

Browse files
authored
chore: Support new pool (#33)
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent c6c22d2 commit 278ee81

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

pool.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package queue
2+
3+
func NewPool(size int, opts ...Option) *Queue {
4+
o := []Option{
5+
WithWorkerCount(size),
6+
WithWorker(NewConsumer(opts...)),
7+
}
8+
o = append(
9+
o,
10+
opts...,
11+
)
12+
13+
q, err := NewQueue(o...)
14+
if err != nil {
15+
panic(err)
16+
}
17+
18+
q.Start()
19+
20+
return q
21+
}

pool_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestNewPoolWithQueueTask(t *testing.T) {
12+
totalN := 5
13+
taskN := 100
14+
rets := make(chan struct{}, taskN)
15+
16+
p := NewPool(totalN)
17+
time.Sleep(time.Millisecond * 50)
18+
assert.Equal(t, totalN, p.Workers())
19+
20+
for i := 0; i < taskN; i++ {
21+
assert.NoError(t, p.QueueTask(func(context.Context) error {
22+
rets <- struct{}{}
23+
return nil
24+
}))
25+
}
26+
27+
for i := 0; i < taskN; i++ {
28+
<-rets
29+
}
30+
31+
// shutdown all, and now running worker is 0
32+
p.Release()
33+
assert.Equal(t, 0, p.Workers())
34+
}

queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ func (q *Queue) Shutdown() {
9999
})
100100
}
101101

102+
// Workers returns the numbers of workers has been created.
103+
func (q *Queue) Release() {
104+
q.Shutdown()
105+
q.Wait()
106+
}
107+
102108
// Workers returns the numbers of workers has been created.
103109
func (q *Queue) Workers() int {
104110
return int(atomic.LoadInt32(&q.runningWorkers))

0 commit comments

Comments
 (0)