Skip to content

Commit f529662

Browse files
authored
Merge pull request #1 from lxzan/dev
Zero Allocs FIFO
2 parents 8b2c36a + e018da7 commit f529662

File tree

17 files changed

+566
-209
lines changed

17 files changed

+566
-209
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ jobs:
2222
go-version: 1.20.3
2323
- name: Test
2424
run: go test -v ./...
25+
- name: Bench
26+
run: make bench

.gitignore

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,3 @@ vendor/
1616
examples/
1717
.idea/
1818
.vacode/
19-
go.work
20-
go.work.sum

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
test:
2-
go test ./...
2+
go test -count=1 ./...
33

44
bench:
55
go test -benchmem -run=^$$ -bench . github.com/lxzan/concurrency/benchmark

README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
### Install
99

1010
```bash
11-
GOPROXY=https://goproxy.cn go get -v github.com/lxzan/concurrency@latest
11+
go get -v github.com/lxzan/concurrency@latest
1212
```
1313

1414
#### Usage
@@ -52,6 +52,7 @@ func main() {
5252
package main
5353

5454
import (
55+
"context"
5556
"fmt"
5657
"github.com/lxzan/concurrency/queues"
5758
"sync/atomic"
@@ -67,7 +68,7 @@ func main() {
6768
atomic.AddInt64(&sum, x)
6869
})
6970
}
70-
w.Stop()
71+
w.Stop(context.Background())
7172
fmt.Printf("sum=%d\n", sum)
7273
}
7374
```
@@ -80,16 +81,16 @@ func main() {
8081

8182
```
8283
go test -benchmem -run=^$ -bench . github.com/lxzan/concurrency/benchmark
83-
goos: darwin
84-
goarch: arm64
84+
goos: linux
85+
goarch: amd64
8586
pkg: github.com/lxzan/concurrency/benchmark
86-
Benchmark_Fib-8 1534509 775.5 ns/op 0 B/op 0 allocs/op
87-
Benchmark_StdGo-8 390 3078647 ns/op 160585 B/op 10002 allocs/op
88-
Benchmark_QueuesSingle-8 262 4388264 ns/op 345144 B/op 10898 allocs/op
89-
Benchmark_QueuesMultiple-8 470 2630718 ns/op 323923 B/op 10964 allocs/op
90-
Benchmark_Ants-8 178 6708482 ns/op 160374 B/op 10004 allocs/op
91-
Benchmark_GoPool-8 348 3487154 ns/op 194926 B/op 10511 allocs/op
87+
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
88+
Benchmark_Fib-12 1000000 1146 ns/op 0 B/op 0 allocs/op
89+
Benchmark_StdGo-12 3661 317905 ns/op 16064 B/op 1001 allocs/op
90+
Benchmark_QueuesSingle-12 2178 532224 ns/op 67941 B/op 1098 allocs/op
91+
Benchmark_QueuesMultiple-12 3691 317757 ns/op 61648 B/op 1256 allocs/op
92+
Benchmark_Ants-12 1569 751802 ns/op 22596 B/op 1097 allocs/op
93+
Benchmark_GoPool-12 2910 406935 ns/op 19042 B/op 1093 allocs/op
9294
PASS
93-
ok github.com/lxzan/concurrency/benchmark 10.107s
94-
95+
ok github.com/lxzan/concurrency/benchmark 7.271s
9596
```

benchmark/benchmark_test.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,17 @@ import (
1010

1111
const (
1212
Concurrency = 16
13-
M = 10000
13+
M = 1000
1414
N = 13
1515
)
1616

17+
func newJob(wg *sync.WaitGroup) func() {
18+
return func() {
19+
fib(N)
20+
wg.Done()
21+
}
22+
}
23+
1724
func Benchmark_Fib(b *testing.B) {
1825
for i := 0; i < b.N; i++ {
1926
fib(N)
@@ -24,43 +31,43 @@ func Benchmark_StdGo(b *testing.B) {
2431
for i := 0; i < b.N; i++ {
2532
wg := &sync.WaitGroup{}
2633
wg.Add(M)
34+
job := newJob(wg)
2735
for j := 0; j < M; j++ {
28-
go func() {
29-
fib(N)
30-
wg.Done()
31-
}()
36+
go job()
3237
}
3338
wg.Wait()
3439
}
3540
}
3641

3742
func Benchmark_QueuesSingle(b *testing.B) {
38-
q := queues.New(queues.WithConcurrency(Concurrency))
43+
q := queues.New(
44+
queues.WithConcurrency(Concurrency),
45+
queues.WithSharding(1),
46+
)
3947

4048
for i := 0; i < b.N; i++ {
4149
wg := &sync.WaitGroup{}
4250
wg.Add(M)
51+
job := newJob(wg)
4352
for j := 0; j < M; j++ {
44-
q.Push(func() {
45-
fib(N)
46-
wg.Done()
47-
})
53+
q.Push(job)
4854
}
4955
wg.Wait()
5056
}
5157
}
5258

5359
func Benchmark_QueuesMultiple(b *testing.B) {
54-
q := queues.New(queues.WithConcurrency(1), queues.WithMultiple(Concurrency))
60+
q := queues.New(
61+
queues.WithConcurrency(1),
62+
queues.WithSharding(Concurrency),
63+
)
5564

5665
for i := 0; i < b.N; i++ {
5766
wg := &sync.WaitGroup{}
5867
wg.Add(M)
68+
job := newJob(wg)
5969
for j := 0; j < M; j++ {
60-
q.Push(func() {
61-
fib(N)
62-
wg.Done()
63-
})
70+
q.Push(job)
6471
}
6572
wg.Wait()
6673
}
@@ -73,11 +80,9 @@ func Benchmark_Ants(b *testing.B) {
7380
for i := 0; i < b.N; i++ {
7481
wg := &sync.WaitGroup{}
7582
wg.Add(M)
83+
job := newJob(wg)
7684
for j := 0; j < M; j++ {
77-
q.Submit(func() {
78-
fib(N)
79-
wg.Done()
80-
})
85+
q.Submit(job)
8186
}
8287
wg.Wait()
8388
}
@@ -89,11 +94,9 @@ func Benchmark_GoPool(b *testing.B) {
8994
for i := 0; i < b.N; i++ {
9095
wg := &sync.WaitGroup{}
9196
wg.Add(M)
97+
job := newJob(wg)
9298
for j := 0; j < M; j++ {
93-
q.Go(func() {
94-
fib(N)
95-
wg.Done()
96-
})
99+
q.Go(job)
97100
}
98101
wg.Wait()
99102
}

go.work

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
go 1.20
2+
3+
use (
4+
.
5+
./benchmark
6+
)

groups/group.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,7 @@ const (
1313
defaultWaitTimeout = 60 * time.Second // 默认线程同步等待超时
1414
)
1515

16-
var ErrWaitTimeout = errors.New("wait timeout")
17-
1816
type (
19-
options struct {
20-
timeout time.Duration
21-
concurrency int64
22-
caller Caller
23-
cancel bool
24-
}
25-
2617
Caller func(args any, f func(any) error) error
2718

2819
Group[T any] struct {
@@ -34,6 +25,7 @@ type (
3425
taskDone int64 // 已完成任务数量
3526
taskTotal int64 // 总任务数量
3627
OnMessage func(args T) error // 任务处理
28+
OnError func(err error) // 错误处理
3729
}
3830
)
3931

@@ -58,6 +50,7 @@ func New[T any](opts ...Option) *Group[T] {
5850
c.OnMessage = func(args T) error {
5951
return nil
6052
}
53+
c.OnError = func(err error) {}
6154
return c
6255
}
6356

@@ -105,6 +98,7 @@ func (c *Group[T]) do(args T) {
10598
c.mu.Lock()
10699
c.errs = append(c.errs, err)
107100
c.mu.Unlock()
101+
c.OnError(err)
108102
}
109103

110104
if c.incrAndIsDone() {
@@ -161,6 +155,6 @@ func (c *Group[T]) Start() error {
161155
return errors.Join(c.errs...)
162156
case <-ctx.Done():
163157
c.clear()
164-
return ErrWaitTimeout
158+
return ctx.Err()
165159
}
166160
}

groups/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ import (
77
"unsafe"
88
)
99

10+
type options struct {
11+
timeout time.Duration
12+
concurrency int64
13+
caller Caller
14+
cancel bool
15+
}
16+
1017
type Option func(o *options)
1118

1219
// WithTimeout 设置任务超时时间

internal/queue.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package internal
2+
3+
type (
4+
pointer uint32
5+
6+
element[T any] struct {
7+
prev, addr, next pointer
8+
Value T
9+
}
10+
11+
Queue[T any] struct {
12+
head, tail pointer // 头尾指针
13+
length int // 长度
14+
stack stack // 回收站
15+
elements []element[T] // 元素列表
16+
template element[T] // 空值模板
17+
}
18+
)
19+
20+
func NewQueue[T any](capacity uint32) *Queue[T] {
21+
return &Queue[T]{elements: make([]element[T], 1, 1+capacity)}
22+
}
23+
24+
func (c *Queue[T]) get(addr pointer) *element[T] {
25+
if addr > 0 {
26+
return &(c.elements[addr])
27+
}
28+
return nil
29+
}
30+
31+
func (c *Queue[T]) getElement() *element[T] {
32+
if c.stack.Len() > 0 {
33+
addr := c.stack.Pop()
34+
v := c.get(addr)
35+
v.addr = addr
36+
return v
37+
}
38+
39+
addr := pointer(len(c.elements))
40+
c.elements = append(c.elements, c.template)
41+
v := c.get(addr)
42+
v.addr = addr
43+
return v
44+
}
45+
46+
func (c *Queue[T]) Reset() {
47+
c.head, c.tail, c.length = 0, 0, 0
48+
c.stack = c.stack[:0]
49+
c.elements = c.elements[:1]
50+
}
51+
52+
func (c *Queue[T]) Len() int {
53+
return c.length
54+
}
55+
56+
func (c *Queue[T]) Front() (value T) {
57+
if c.head > 0 {
58+
return c.get(c.head).Value
59+
}
60+
return value
61+
}
62+
63+
func (c *Queue[T]) Push(value T) {
64+
ele := c.getElement()
65+
ele.Value = value
66+
c.length++
67+
68+
if c.tail != 0 {
69+
tail := c.get(c.tail)
70+
tail.next = ele.addr
71+
ele.prev = tail.addr
72+
c.tail = ele.addr
73+
return
74+
}
75+
76+
c.head = ele.addr
77+
c.tail = ele.addr
78+
}
79+
80+
func (c *Queue[T]) Pop() (value T) {
81+
ele := c.get(c.head)
82+
if ele == nil {
83+
return value
84+
}
85+
86+
c.head = ele.next
87+
if head := c.get(c.head); head != nil {
88+
head.prev = 0
89+
}
90+
91+
c.length--
92+
value = ele.Value
93+
c.stack.Push(ele.addr)
94+
*ele = c.template
95+
if c.length == 0 {
96+
c.tail = 0
97+
c.Reset()
98+
}
99+
100+
return value
101+
}
102+
103+
func (c *Queue[T]) Range(f func(v T) bool) {
104+
for i := c.get(c.head); i != nil; i = c.get(i.next) {
105+
if !f(i.Value) {
106+
break
107+
}
108+
}
109+
}
110+
111+
type stack []pointer
112+
113+
func (c *stack) Len() int {
114+
return len(*c)
115+
}
116+
117+
func (c *stack) Push(v pointer) {
118+
*c = append(*c, v)
119+
}
120+
121+
func (c *stack) Pop() pointer {
122+
var n = c.Len()
123+
var v = (*c)[n-1]
124+
*c = (*c)[:n-1]
125+
return v
126+
}

0 commit comments

Comments
 (0)