Skip to content

Commit 8b2c36a

Browse files
author
lixizan
committed
Merge branch 'dev'
2 parents 26c105a + 2c87b86 commit 8b2c36a

File tree

8 files changed

+302
-120
lines changed

8 files changed

+302
-120
lines changed

README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ go test -benchmem -run=^$ -bench . github.com/lxzan/concurrency/benchmark
8383
goos: darwin
8484
goarch: arm64
8585
pkg: github.com/lxzan/concurrency/benchmark
86-
Benchmark_Fib-8 1485490 775.0 ns/op 0 B/op 0 allocs/op
87-
Benchmark_StdGo-8 388 3066459 ns/op 160537 B/op 10002 allocs/op
88-
Benchmark_Queues-8 457 2602319 ns/op 324489 B/op 11061 allocs/op
89-
Benchmark_Ants-8 139 7337507 ns/op 160368 B/op 10004 allocs/op
90-
Benchmark_GoPool-8 264 4514672 ns/op 191897 B/op 10569 allocs/op
86+
Benchmark_Fib-8 1534509 775.5 ns/op 0 B/op 0 allocs/op
87+
Benchmark_StdGo-8 390 3078647 ns/op 160585 B/op 10002 allocs/op
88+
Benchmark_QueuesSingle-8 262 4388264 ns/op 345144 B/op 10898 allocs/op
89+
Benchmark_QueuesMultiple-8 470 2630718 ns/op 323923 B/op 10964 allocs/op
90+
Benchmark_Ants-8 178 6708482 ns/op 160374 B/op 10004 allocs/op
91+
Benchmark_GoPool-8 348 3487154 ns/op 194926 B/op 10511 allocs/op
9192
PASS
92-
ok github.com/lxzan/concurrency/benchmark 8.500s
93+
ok github.com/lxzan/concurrency/benchmark 10.107s
94+
9395
```

benchmark/benchmark_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func Benchmark_StdGo(b *testing.B) {
3434
}
3535
}
3636

37-
func Benchmark_Queues(b *testing.B) {
37+
func Benchmark_QueuesSingle(b *testing.B) {
3838
q := queues.New(queues.WithConcurrency(Concurrency))
3939

4040
for i := 0; i < b.N; i++ {
@@ -50,6 +50,22 @@ func Benchmark_Queues(b *testing.B) {
5050
}
5151
}
5252

53+
func Benchmark_QueuesMultiple(b *testing.B) {
54+
q := queues.New(queues.WithConcurrency(1), queues.WithMultiple(Concurrency))
55+
56+
for i := 0; i < b.N; i++ {
57+
wg := &sync.WaitGroup{}
58+
wg.Add(M)
59+
for j := 0; j < M; j++ {
60+
q.Push(func() {
61+
fib(N)
62+
wg.Done()
63+
})
64+
}
65+
wg.Wait()
66+
}
67+
}
68+
5369
func Benchmark_Ants(b *testing.B) {
5470
q, _ := ants.NewPool(Concurrency)
5571
defer q.Release()

groups/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ func WithTimeout(t time.Duration) Option {
1717
}
1818

1919
// WithConcurrency 设置最大并发
20-
func WithConcurrency(num int64) Option {
20+
func WithConcurrency(n int64) Option {
2121
return func(o *options) {
22-
o.concurrency = num
22+
o.concurrency = n
2323
}
2424
}
2525

queues/multiple_queue.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package queues
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
type multipleQueue struct {
10+
options *options // 参数
11+
serial int64 // 序列号
12+
size int64 // 队列大小
13+
stopped atomic.Uint32 // 是否关闭
14+
qs []*singleQueue // 子队列
15+
}
16+
17+
// 创建多重队列
18+
func newMultipleQueue(o *options) *multipleQueue {
19+
qs := make([]*singleQueue, o.size)
20+
for i := int64(0); i < o.size; i++ {
21+
qs[i] = newSingleQueue(o)
22+
}
23+
return &multipleQueue{options: o, qs: qs, size: o.size}
24+
}
25+
26+
// Push 追加任务
27+
func (c *multipleQueue) Push(job Job) {
28+
if c.stopped.Load() == 0 {
29+
index := atomic.AddInt64(&c.serial, 1) & (c.size - 1)
30+
c.qs[index].Push(job)
31+
}
32+
}
33+
34+
// Stop 停止
35+
// 可能需要等待一段时间, 直到所有任务执行完成或者超时
36+
func (c *multipleQueue) Stop() error {
37+
ctx, cancel := context.WithTimeout(context.Background(), c.options.timeout)
38+
ticker := time.NewTicker(50 * time.Millisecond)
39+
defer func() {
40+
cancel()
41+
ticker.Stop()
42+
}()
43+
44+
for {
45+
select {
46+
case <-ticker.C:
47+
if c.doStop(false) {
48+
return nil
49+
}
50+
case <-ctx.Done():
51+
c.doStop(true)
52+
return ErrStopTimeout
53+
}
54+
}
55+
}
56+
57+
func (c *multipleQueue) doStop(force bool) bool {
58+
if force {
59+
c.stopped.Store(1)
60+
return true
61+
}
62+
63+
sum := 0
64+
for _, item := range c.qs {
65+
sum += item.len()
66+
}
67+
if sum == 0 {
68+
c.stopped.Store(1)
69+
return true
70+
}
71+
72+
return false
73+
}

queues/options.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
type Option func(o *options)
1212

1313
// WithConcurrency 设置最大并发
14-
func WithConcurrency(num int64) Option {
14+
func WithConcurrency(n int64) Option {
1515
return func(o *options) {
16-
o.concurrency = internal.ToBinaryNumber(num)
16+
o.concurrency = n
1717
}
1818
}
1919

@@ -31,6 +31,15 @@ func WithLogger(logger logs.Logger) Option {
3131
}
3232
}
3333

34+
// WithMultiple 设置多重队列, 降低锁竞争开销
35+
// 注意: n会被转化为pow(2,x)
36+
func WithMultiple(n int64) Option {
37+
return func(o *options) {
38+
o.multiple = true
39+
o.size = internal.ToBinaryNumber(n)
40+
}
41+
}
42+
3443
// WithRecovery 设置恢复程序
3544
func WithRecovery() Option {
3645
return func(o *options) {

queues/queue.go

Lines changed: 16 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package queues
22

33
import (
4-
"context"
54
"github.com/lxzan/concurrency/logs"
6-
"sync"
7-
"sync/atomic"
5+
"github.com/pkg/errors"
86
"time"
97
)
108

@@ -13,10 +11,12 @@ const (
1311
defaultTimeout = 30 * time.Second
1412
)
1513

16-
var DefaultQueue = New(WithConcurrency(16), WithRecovery())
14+
var ErrStopTimeout = errors.New("stop timeout")
1715

1816
type (
1917
options struct {
18+
multiple bool
19+
size int64
2020
concurrency int64
2121
timeout time.Duration
2222
caller Caller
@@ -25,28 +25,21 @@ type (
2525

2626
Caller func(logger logs.Logger, f func())
2727

28-
queue struct {
29-
mu *sync.Mutex // 锁
30-
q []Job // 任务队列
31-
maxConcurrency int64 // 最大并发
32-
curConcurrency int64 // 当前并发
33-
caller Caller // 异常处理
34-
logger logs.Logger // 日志
35-
}
36-
3728
Job func()
3829

39-
Queue struct {
40-
options *options
41-
serial int64
42-
qs []*queue
30+
Queue interface {
31+
// 追加任务
32+
Push(job Job)
33+
34+
// 停止
35+
// 注意: 此方法有阻塞等待任务结束逻辑; 停止后调用Push方法不会产生任何效果.
36+
Stop() error
4337
}
4438
)
4539

46-
// New
47-
// 创建N条并发度为1的任务队列
48-
func New(opts ...Option) *Queue {
40+
func New(opts ...Option) Queue {
4941
o := &options{
42+
multiple: false,
5043
concurrency: defaultConcurrency,
5144
timeout: defaultTimeout,
5245
caller: func(logger logs.Logger, f func()) { f() },
@@ -56,93 +49,8 @@ func New(opts ...Option) *Queue {
5649
f(o)
5750
}
5851

59-
qs := make([]*queue, o.concurrency)
60-
for i := int64(0); i < o.concurrency; i++ {
61-
qs[i] = newQueue(o)
62-
}
63-
return &Queue{options: o, qs: qs}
64-
}
65-
66-
// Push 追加任务
67-
func (c *Queue) Push(job Job) {
68-
index := atomic.AddInt64(&c.serial, 1) & (c.options.concurrency - 1)
69-
c.qs[index].push(job)
70-
}
71-
72-
// Stop 停止
73-
// 可能需要等待一段时间, 直到所有任务执行完成或者超时
74-
func (c *Queue) Stop() {
75-
ctx, cancel := context.WithTimeout(context.Background(), c.options.timeout)
76-
ticker := time.NewTicker(50 * time.Millisecond)
77-
defer func() {
78-
cancel()
79-
ticker.Stop()
80-
}()
81-
82-
for {
83-
select {
84-
case <-ticker.C:
85-
sum := 0
86-
for _, item := range c.qs {
87-
sum += item.len()
88-
}
89-
if sum == 0 {
90-
return
91-
}
92-
case <-ctx.Done():
93-
return
94-
}
95-
}
96-
}
97-
98-
// newQueue 创建一个任务队列
99-
func newQueue(o *options) *queue {
100-
return &queue{
101-
mu: &sync.Mutex{},
102-
maxConcurrency: 1,
103-
curConcurrency: 0,
104-
caller: o.caller,
105-
logger: o.logger,
106-
}
107-
}
108-
109-
func (c *queue) len() int {
110-
c.mu.Lock()
111-
defer c.mu.Unlock()
112-
return len(c.q)
113-
}
114-
115-
// 获取一个任务
116-
func (c *queue) getJob(delta int64) Job {
117-
c.mu.Lock()
118-
defer c.mu.Unlock()
119-
c.curConcurrency += delta
120-
if c.curConcurrency >= c.maxConcurrency {
121-
return nil
122-
}
123-
if n := len(c.q); n == 0 {
124-
return nil
125-
}
126-
var result = c.q[0]
127-
c.q = c.q[1:]
128-
c.curConcurrency++
129-
return result
130-
}
131-
132-
// 循环执行任务
133-
func (c *queue) do(job Job) {
134-
for job != nil {
135-
c.caller(c.logger, job)
136-
job = c.getJob(-1)
137-
}
138-
}
139-
140-
// push 追加任务, 有资源空闲的话会立即执行
141-
func (c *queue) push(job Job) {
142-
c.mu.Lock()
143-
c.q = append(c.q, job)
144-
c.mu.Unlock()
145-
if item := c.getJob(0); item != nil {
146-
go c.do(item)
52+
if o.multiple {
53+
return newMultipleQueue(o)
14754
}
55+
return newSingleQueue(o)
14856
}

0 commit comments

Comments
 (0)