Skip to content

Commit e939502

Browse files
authored
Merge pull request #4 from FishGoddess/develop
v0.2.1-alpha
2 parents 938b1f7 + 65225c4 commit e939502

File tree

8 files changed

+81
-12
lines changed

8 files changed

+81
-12
lines changed

FUTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* [x] 支持设置 worker 任务队列大小
1111
* [x] 支持设置 panic 处理函数
1212
* [x] 支持自定义 sync.Locker 实现
13-
* [ ] 完善单元测试,将覆盖率提高到 95% 以上
13+
* [x] 完善单元测试,将覆盖率提高到 95% 以上
1414

1515
### v0.1.0
1616

HISTORY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
## ✒ 历史版本的特性介绍 (Features in old versions)
22

3+
### v0.2.1-alpha
4+
5+
> 此版本发布于 2025-06-25
6+
7+
* 支持选择 sync.Mutex 作为锁实现
8+
* 单元测试覆盖率提高到 95%
9+
310
### v0.2.0-alpha
411

512
> 此版本发布于 2025-06-25

_icons/coverage.svg

Lines changed: 2 additions & 2 deletions
Loading

executor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"sync"
99
)
1010

11+
// Executor executes tasks concurrently using limited goroutines.
12+
// You can specify the number of workers and the queue size of each worker.
1113
type Executor struct {
1214
conf *config
1315

@@ -19,6 +21,7 @@ type Executor struct {
1921
lock sync.Locker
2022
}
2123

24+
// NewExecutor creates a new executor with given worker number and options.
2225
func NewExecutor(workerNum int, opts ...Option) *Executor {
2326
conf := newDefaultConfig(workerNum)
2427
for _, opt := range opts {
@@ -49,7 +52,8 @@ func NewExecutor(workerNum int, opts ...Option) *Executor {
4952
return executor
5053
}
5154

52-
func (e *Executor) Submit(task func()) {
55+
// Submit submits a task to be handled by workers.
56+
func (e *Executor) Submit(task Task) {
5357
e.lock.Lock()
5458
defer e.lock.Unlock()
5559

@@ -65,10 +69,12 @@ func (e *Executor) Submit(task func()) {
6569
}
6670
}
6771

72+
// Wait waits all tasks to be handled.
6873
func (e *Executor) Wait() {
6974
e.wg.Wait()
7075
}
7176

77+
// Close closes the executor after handling all tasks.
7278
func (e *Executor) Close() {
7379
e.lock.Lock()
7480
defer e.lock.Unlock()

option.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,26 @@ func WithWorkerQueueSize(size int) Option {
2121
}
2222

2323
// WithRecoverFunc sets the recover function.
24-
func WithRecoverFunc(f func(r any)) Option {
24+
func WithRecoverFunc(recoverFunc func(r any)) Option {
2525
return func(conf *config) {
26-
conf.recoverFunc = f
26+
conf.recoverFunc = recoverFunc
2727
}
2828
}
2929

3030
// WithNewLockerFunc sets the new locker function.
31-
func WithNewLockerFunc(f func() sync.Locker) Option {
31+
func WithNewLockerFunc(newLockerFunc func() sync.Locker) Option {
3232
return func(conf *config) {
33-
conf.newLockerFunc = f
33+
conf.newLockerFunc = newLockerFunc
34+
}
35+
}
36+
37+
// WithSyncMutex sets the new locker function returns sync.Mutex.
38+
func WithSyncMutex() Option {
39+
newLockerFunc := func() sync.Locker {
40+
return new(sync.Mutex)
41+
}
42+
43+
return func(conf *config) {
44+
conf.newLockerFunc = newLockerFunc
3445
}
3546
}

option_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,15 @@ func TestWithNewLockerFunc(t *testing.T) {
4545
t.Fatalf("conf.newLockerFunc %p != newLockerFunc %p", conf.newLockerFunc, newLockerFunc)
4646
}
4747
}
48+
49+
// go test -v -cover -run=^TestWithSyncMutex$
50+
func TestWithSyncMutex(t *testing.T) {
51+
workerNum := 16
52+
conf := newDefaultConfig(workerNum)
53+
WithSyncMutex()(conf)
54+
55+
lock := conf.newLockerFunc()
56+
if _, ok := lock.(*sync.Mutex); !ok {
57+
t.Fatalf("lock %T is not *sync.Mutex", lock)
58+
}
59+
}

worker.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func newWorker(executor *Executor) *worker {
1919
return w
2020
}
2121

22-
func (w *worker) handle(task func()) {
22+
func (w *worker) handle(task Task) {
2323
defer func() {
2424
if r := recover(); r != nil {
2525
w.executor.conf.recover(r)
@@ -33,6 +33,7 @@ func (w *worker) work() {
3333
w.executor.wg.Add(1)
3434
go func() {
3535
defer w.executor.wg.Done()
36+
defer close(w.taskQueue)
3637

3738
for task := range w.taskQueue {
3839
if task == nil {
@@ -41,13 +42,11 @@ func (w *worker) work() {
4142

4243
w.handle(task)
4344
}
44-
45-
close(w.taskQueue)
4645
}()
4746
}
4847

4948
// Accept accepts a task to be handled.
50-
func (w *worker) Accept(task func()) {
49+
func (w *worker) Accept(task Task) {
5150
if task == nil {
5251
return
5352
}

worker_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,37 @@
33
// license that can be found in the LICENSE file.
44

55
package goes
6+
7+
import "testing"
8+
9+
// go test -v -cover -run=^TestWorkerHandle$
10+
func TestWorkerHandle(t *testing.T) {
11+
got := 0
12+
want := 666
13+
14+
executor := &Executor{
15+
conf: &config{
16+
recoverFunc: func(r any) {
17+
got = r.(int)
18+
},
19+
},
20+
}
21+
22+
worker := &worker{executor: executor}
23+
worker.handle(func() {
24+
panic(want)
25+
})
26+
27+
if got != want {
28+
t.Fatalf("got %d != want %d", got, want)
29+
}
30+
31+
want = 123
32+
worker.handle(func() {
33+
got = 123
34+
})
35+
36+
if got != want {
37+
t.Fatalf("got %d != want %d", got, want)
38+
}
39+
}

0 commit comments

Comments
 (0)