Skip to content

Commit 135e60d

Browse files
committed
optimize work pool wait function
1 parent 0402806 commit 135e60d

File tree

2 files changed

+41
-50
lines changed

2 files changed

+41
-50
lines changed

queue/queue.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package queue
22

3-
43
import (
54
"runtime"
65
"sync"
@@ -10,7 +9,7 @@ import (
109
"gopkg.in/eapache/queue.v1"
1110
)
1211

13-
//Queue queue
12+
// Queue queue
1413
type Queue struct {
1514
sync.Mutex
1615
popable *sync.Cond
@@ -21,7 +20,7 @@ type Queue struct {
2120
once sync.Once
2221
}
2322

24-
//New 创建
23+
// New
2524
func New() *Queue {
2625
ch := &Queue{
2726
buffer: queue.New(),
@@ -30,7 +29,7 @@ func New() *Queue {
3029
return ch
3130
}
3231

33-
//Pop 取出队列,(阻塞模式)
32+
// Pop
3433
func (q *Queue) Pop() (v interface{}) {
3534
c := q.popable
3635

@@ -41,7 +40,7 @@ func (q *Queue) Pop() (v interface{}) {
4140
c.Wait()
4241
}
4342

44-
if q.closed { //已关闭
43+
if q.closed {
4544
return
4645
}
4746

@@ -54,7 +53,7 @@ func (q *Queue) Pop() (v interface{}) {
5453
return
5554
}
5655

57-
// TryPop 试着取出队列(非阻塞模式)返回ok == false 表示空
56+
// TryPop
5857
func (q *Queue) TryPop() (v interface{}, ok bool) {
5958
buffer := q.buffer
6059

@@ -73,7 +72,7 @@ func (q *Queue) TryPop() (v interface{}, ok bool) {
7372
return
7473
}
7574

76-
// TryPopTimeout 试着取出队列(塞模式+timeout)返回ok == false 表示超时
75+
// TryPopTimeout
7776
func (q *Queue) TryPopTimeout(tm time.Duration) (v interface{}, ok bool) {
7877
q.once.Do(func() {
7978
q.cc = make(chan interface{}, 1)
@@ -96,7 +95,7 @@ func (q *Queue) TryPopTimeout(tm time.Duration) (v interface{}, ok bool) {
9695
return
9796
}
9897

99-
//Pop 取出队列,(阻塞模式)
98+
// Pop
10099
func (q *Queue) popChan(v *chan interface{}) {
101100
c := q.popable
102101

@@ -107,7 +106,7 @@ func (q *Queue) popChan(v *chan interface{}) {
107106
c.Wait()
108107
}
109108

110-
if q.closed { //已关闭
109+
if q.closed {
111110
*v <- nil
112111
return
113112
}
@@ -124,7 +123,7 @@ func (q *Queue) popChan(v *chan interface{}) {
124123
return
125124
}
126125

127-
// Push 插入队列,非阻塞
126+
// Push
128127
func (q *Queue) Push(v interface{}) {
129128
q.Mutex.Lock()
130129
defer q.Mutex.Unlock()
@@ -135,7 +134,7 @@ func (q *Queue) Push(v interface{}) {
135134
}
136135
}
137136

138-
// Len 获取队列长度
137+
// Len
139138
func (q *Queue) Len() int {
140139
return (int)(atomic.LoadInt32(&q.count))
141140
}
@@ -148,7 +147,7 @@ func (q *Queue) Close() {
148147
if !q.closed {
149148
q.closed = true
150149
atomic.StoreInt32(&q.count, 0)
151-
q.popable.Broadcast() //广播
150+
q.popable.Broadcast()
152151
}
153152
}
154153

@@ -157,13 +156,13 @@ func (q *Queue) IsClose() bool {
157156
return q.closed
158157
}
159158

160-
//Wait 等待队列消费完成
159+
// Wait
161160
func (q *Queue) Wait() {
162161
for {
163162
if q.closed || q.Len() == 0 {
164163
break
165164
}
166165

167-
runtime.Gosched() //出让时间片
166+
runtime.Gosched()
168167
}
169168
}

workpool/workpool.go

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package workpool
22

33
import (
4-
"sync"
5-
"time"
6-
"context"
4+
"context"
75
"runtime"
6+
"sync"
87
"sync/atomic"
8+
"time"
99

1010
"github.com/sunvim/utils/queue"
1111
)
@@ -16,16 +16,17 @@ type TaskHandler func() error
1616
// WorkPool serves incoming connections via a pool of workers
1717
type WorkPool struct {
1818
closed int32
19-
isQueTask int32 // Mark whether queue retrieval is task. 标记是否队列取出任务
19+
isQueTask int32 // Mark whether queue retrieval is task.
2020
errChan chan error // error chan
2121
timeout time.Duration // max timeout
2222
wg sync.WaitGroup
2323
task chan TaskHandler
2424
waitingQueue *queue.Queue
25+
workerNum int
2526
}
2627

2728
// New new workpool and set the max number of concurrencies
28-
func New(max int) *WorkPool { // 注册工作池,并设置最大并发数
29+
func New(max int) *WorkPool {
2930
if max < 1 {
3031
max = 1
3132
}
@@ -34,28 +35,28 @@ func New(max int) *WorkPool { // 注册工作池,并设置最大并发数
3435
task: make(chan TaskHandler, 2*max),
3536
errChan: make(chan error, 1),
3637
waitingQueue: queue.New(),
38+
workerNum: max,
3739
}
3840

3941
go p.loop(max)
4042
return p
4143
}
4244

4345
// SetTimeout Setting timeout time
44-
func (p *WorkPool) SetTimeout(timeout time.Duration) { // 设置超时时间
46+
func (p *WorkPool) SetTimeout(timeout time.Duration) {
4547
p.timeout = timeout
4648
}
4749

4850
// Do Add to the workpool and return immediately
49-
func (p *WorkPool) Do(fn TaskHandler) { // 添加到工作池,并立即返回
50-
if p.IsClosed() { // 已关闭
51+
func (p *WorkPool) Do(fn TaskHandler) {
52+
if p.IsClosed() {
5153
return
5254
}
5355
p.waitingQueue.Push(fn)
54-
// p.task <- fn
5556
}
5657

5758
// DoWait Add to the workpool and wait for execution to complete before returning
58-
func (p *WorkPool) DoWait(task TaskHandler) { // 添加到工作池,并等待执行完成之后再返回
59+
func (p *WorkPool) DoWait(task TaskHandler) {
5960
if p.IsClosed() { // closed
6061
return
6162
}
@@ -69,22 +70,23 @@ func (p *WorkPool) DoWait(task TaskHandler) { // 添加到工作池,并等待
6970
}
7071

7172
// Wait Waiting for the worker thread to finish executing
72-
func (p *WorkPool) Wait() error { // 等待工作线程执行结束
73-
p.waitingQueue.Wait() // 等待队列结束
74-
p.waitingQueue.Close() //
75-
p.waitTask() // wait que down
73+
func (p *WorkPool) Wait() error {
74+
p.waitingQueue.Wait()
75+
p.waitTask() // wait que down
7676
close(p.task)
77-
p.wg.Wait() // 等待结束
77+
p.wg.Wait() // wait all task finished
7878
select {
7979
case err := <-p.errChan:
80+
p.task = make(chan TaskHandler, p.workerNum*2)
8081
return err
8182
default:
83+
p.task = make(chan TaskHandler, p.workerNum*2)
8284
return nil
8385
}
8486
}
8587

8688
// IsDone Determine whether it is complete (non-blocking)
87-
func (p *WorkPool) IsDone() bool { // 判断是否完成 (非阻塞)
89+
func (p *WorkPool) IsDone() bool {
8890
if p == nil || p.task == nil {
8991
return true
9092
}
@@ -93,7 +95,7 @@ func (p *WorkPool) IsDone() bool { // 判断是否完成 (非阻塞)
9395
}
9496

9597
// IsClosed Has it been closed?
96-
func (p *WorkPool) IsClosed() bool { // 是否已经关闭
98+
func (p *WorkPool) IsClosed() bool {
9799
if atomic.LoadInt32(&p.closed) == 1 { // closed
98100
return true
99101
}
@@ -121,11 +123,9 @@ func (p *WorkPool) startQueue() {
121123
atomic.StoreInt32(&p.isQueTask, 0)
122124
}
123125

124-
125-
126126
func (p *WorkPool) waitTask() {
127127
for {
128-
runtime.Gosched() // 出让时间片
128+
runtime.Gosched()
129129
if p.IsDone() {
130130
if atomic.LoadInt32(&p.isQueTask) == 0 {
131131
break
@@ -134,46 +134,40 @@ func (p *WorkPool) waitTask() {
134134
}
135135
}
136136

137-
138-
139137
func (p *WorkPool) loop(maxWorkersCount int) {
140-
go p.startQueue() // Startup queue , 启动队列
138+
go p.startQueue() // Startup queue
141139

142-
p.wg.Add(maxWorkersCount) // Maximum number of work cycles,最大的工作协程数
143-
// Start Max workers, 启动max个worker
140+
p.wg.Add(maxWorkersCount) // Maximum number of work cycles
141+
// Start Max workers
144142
for i := 0; i < maxWorkersCount; i++ {
145143
go func() {
146144
defer p.wg.Done()
147-
// worker 开始干活
145+
148146
for wt := range p.task {
149-
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { // returns immediately,有err 立即返回
150-
continue // It needs to be consumed before returning.需要先消费完了之后再返回,
147+
if wt == nil || atomic.LoadInt32(&p.closed) == 1 { // returns immediately
148+
continue // It needs to be consumed before returning.
151149
}
152150

153151
closed := make(chan struct{}, 1)
154-
// Set timeout, priority task timeout.有设置超时,优先task 的超时
152+
// Set timeout, priority task timeout.
155153
if p.timeout > 0 {
156154
ct, cancel := context.WithTimeout(context.Background(), p.timeout)
157155
go func() {
158156
select {
159157
case <-ct.Done():
160158
p.errChan <- ct.Err()
161-
// if atomic.LoadInt32(&p.closed) != 1 {
162-
// mylog.Error(ct.Err())
163159
atomic.StoreInt32(&p.closed, 1)
164160
cancel()
165161
case <-closed:
166162
}
167163
}()
168164
}
169165

170-
err := wt() // Points of Execution.真正执行的点
166+
err := wt() // Points of Execution.
171167
close(closed)
172168
if err != nil {
173169
select {
174170
case p.errChan <- err:
175-
// if atomic.LoadInt32(&p.closed) != 1 {
176-
// mylog.Error(err)
177171
atomic.StoreInt32(&p.closed, 1)
178172
default:
179173
}
@@ -182,5 +176,3 @@ func (p *WorkPool) loop(maxWorkersCount int) {
182176
}()
183177
}
184178
}
185-
186-

0 commit comments

Comments
 (0)