Skip to content

Commit a023310

Browse files
authored
Merge pull request #5 from FishGoddess/develop
v0.2.2-alpha
2 parents e939502 + fb0d6cc commit a023310

18 files changed

+386
-37
lines changed

FUTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
* [x] 增加异步任务执行器
66
* [x] 支持轮询调度策略
7-
* [ ] 支持随机调度策略
7+
* [x] 支持随机调度策略
88
* [ ] 支持任务数量优先调度策略
99
* [ ] 支持任务时间优先调度策略
1010
* [x] 支持设置 worker 任务队列大小

HISTORY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
## ✒ 历史版本的特性介绍 (Features in old versions)
22

3+
### v0.2.2-alpha
4+
5+
> 此版本发布于 2025-06-26
6+
7+
* 抽象出 workers 接口层
8+
* 增加随机调度策略
9+
310
### v0.2.1-alpha
411

512
> 此版本发布于 2025-06-25

README.en.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111

1212
### 🥇 Features
1313

14-
* Supports spin lock with backoff strategy.
1514
* Limits the number of simultaneous goroutines and not reuses them by Limiter.
1615
* Limits the number of simultaneous goroutines and reuses them by Executor.
16+
* Supports multiple scheduling strategies, including round robin, random, etc.
17+
* Supports spin lock with backoff strategy.
1718

1819
_Check [HISTORY.md](./HISTORY.md) and [FUTURE.md](./FUTURE.md) to know about more information._
1920

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111

1212
### 🥇 功能特性
1313

14-
* 内置退避策略的自旋锁。
1514
* 支持只限制同时执行的协程数,但不复用协程,使用 Limiter。
1615
* 支持限制同时执行的协程数,且复用协程,使用 Executor。
16+
* 支持多种调度策略,包括轮询、随机等。
17+
* 支持使用退避策略的自旋锁。
1718

1819
_历史版本的特性请查看 [HISTORY.md](./HISTORY.md)。未来版本的新特性和计划请查看 [FUTURE.md](./FUTURE.md)_
1920

_icons/coverage.svg

Lines changed: 2 additions & 2 deletions
Loading

config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type config struct {
1515
workerQueueSize int
1616
recoverFunc func(r any)
1717
newLockerFunc func() sync.Locker
18+
newWorkersFunc func(workerNum int) workers
1819
}
1920

2021
func newDefaultConfig(workerNum int) *config {
@@ -23,6 +24,7 @@ func newDefaultConfig(workerNum int) *config {
2324
workerQueueSize: 64,
2425
recoverFunc: nil,
2526
newLockerFunc: nil,
27+
newWorkersFunc: nil,
2628
}
2729
}
2830

@@ -39,3 +41,11 @@ func (c *config) newLocker() sync.Locker {
3941

4042
return c.newLockerFunc()
4143
}
44+
45+
func (c *config) newWorkers() workers {
46+
if c.newWorkersFunc == nil {
47+
return newRoundRobinWorkers(c.workerNum)
48+
}
49+
50+
return c.newWorkersFunc(c.workerNum)
51+
}

config_test.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"fmt"
99
"sync"
1010
"testing"
11+
12+
"github.com/FishGoddess/goes/pkg/spinlock"
1113
)
1214

1315
// go test -v -cover -run=^TestNewDefaultConfig$
@@ -43,16 +45,50 @@ func TestConfigRecover(t *testing.T) {
4345
func TestConfigNewLocker(t *testing.T) {
4446
workerNum := 16
4547
conf := newDefaultConfig(workerNum)
46-
conf.newLocker()
48+
49+
got := conf.newLocker()
50+
if _, ok := got.(*spinlock.Lock); !ok {
51+
t.Fatalf("got %T is not *spinlock.Lock", got)
52+
}
4753

4854
want := &sync.Mutex{}
4955
conf.newLockerFunc = func() sync.Locker {
5056
return want
5157
}
5258

53-
got := conf.newLocker()
59+
got = conf.newLocker()
60+
if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) {
61+
t.Fatalf("got %p != want %p", got, want)
62+
}
63+
}
5464

65+
// go test -v -cover -run=^TestConfigNewWorkers$
66+
func TestConfigNewWorkers(t *testing.T) {
67+
workerNum := 16
68+
conf := newDefaultConfig(workerNum)
69+
70+
got := conf.newWorkers()
71+
if _, ok := got.(*roundRobinWorkers); !ok {
72+
t.Fatalf("got %T is not *roundRobinWorkers", got)
73+
}
74+
75+
want := &roundRobinWorkers{}
76+
conf.newWorkersFunc = func(workerNum int) workers {
77+
want.workers = make([]*worker, workerNum)
78+
return want
79+
}
80+
81+
got = conf.newWorkers()
5582
if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) {
5683
t.Fatalf("got %p != want %p", got, want)
5784
}
85+
86+
rrWorkers, ok := got.(*roundRobinWorkers)
87+
if !ok {
88+
t.Fatalf("got %T is not *roundRobinWorkers", got)
89+
}
90+
91+
if len(rrWorkers.workers) != workerNum {
92+
t.Fatalf("len(rrWorkers.workers) %d != workerNum %d", len(rrWorkers.workers), workerNum)
93+
}
5894
}

executor.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,24 @@
55
package goes
66

77
import (
8+
"errors"
89
"sync"
910
)
1011

12+
var (
13+
ErrExecutorClosed = errors.New("goes: executor is closed")
14+
ErrWorkerIsNil = errors.New("goes: worker is nil")
15+
)
16+
17+
// Task is a function can be executed by executor.
18+
type Task = func()
19+
1120
// Executor executes tasks concurrently using limited goroutines.
1221
// You can specify the number of workers and the queue size of each worker.
1322
type Executor struct {
1423
conf *config
1524

16-
workers []*worker
17-
index int
25+
workers workers
1826
closed bool
1927

2028
wg sync.WaitGroup
@@ -38,35 +46,35 @@ func NewExecutor(workerNum int, opts ...Option) *Executor {
3846

3947
executor := &Executor{
4048
conf: conf,
41-
workers: make([]*worker, 0, conf.workerNum),
42-
index: 0,
49+
workers: conf.newWorkers(),
4350
closed: false,
4451
lock: conf.newLocker(),
4552
}
4653

4754
for range conf.workerNum {
4855
worker := newWorker(executor)
49-
executor.workers = append(executor.workers, worker)
56+
executor.workers.Add(worker)
5057
}
5158

5259
return executor
5360
}
5461

5562
// Submit submits a task to be handled by workers.
56-
func (e *Executor) Submit(task Task) {
63+
func (e *Executor) Submit(task Task) error {
5764
e.lock.Lock()
5865
defer e.lock.Unlock()
5966

6067
if e.closed {
61-
return
68+
return ErrExecutorClosed
6269
}
6370

64-
worker := e.workers[e.index]
65-
worker.Accept(task)
66-
67-
if e.index++; e.index >= len(e.workers) {
68-
e.index = 0
71+
worker := e.workers.Next()
72+
if worker == nil {
73+
return ErrWorkerIsNil
6974
}
75+
76+
worker.Accept(task)
77+
return nil
7078
}
7179

7280
// Wait waits all tasks to be handled.
@@ -79,10 +87,7 @@ func (e *Executor) Close() {
7987
e.lock.Lock()
8088
defer e.lock.Unlock()
8189

82-
for _, worker := range e.workers {
83-
worker.Done()
84-
}
85-
90+
e.workers.Done()
8691
e.wg.Wait()
8792
e.closed = true
8893
}

executor_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,23 @@ func TestExecutor(t *testing.T) {
4747
t.Fatalf("gotTotalCount %d != totalCount %d", gotTotalCount, totalCount)
4848
}
4949
}
50+
51+
// go test -v -cover -run=^TestExecutorError$
52+
func TestExecutorError(t *testing.T) {
53+
workerNum := 16
54+
executor := NewExecutor(workerNum)
55+
executor.Close()
56+
57+
err := executor.Submit(func() {})
58+
if err != ErrExecutorClosed {
59+
t.Fatalf("err %v != ErrExecutorClosed %v", err, ErrExecutorClosed)
60+
}
61+
62+
executor = NewExecutor(workerNum)
63+
executor.workers = &roundRobinWorkers{}
64+
65+
err = executor.Submit(func() {})
66+
if err != ErrWorkerIsNil {
67+
t.Fatalf("err %v != ErrWorkerIsNil %v", err, ErrWorkerIsNil)
68+
}
69+
}

option.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
package goes
66

7-
import "sync"
7+
import (
8+
"sync"
9+
10+
"github.com/FishGoddess/goes/pkg/spinlock"
11+
)
812

913
// Option is for setting config.
1014
type Option func(conf *config)
@@ -34,7 +38,18 @@ func WithNewLockerFunc(newLockerFunc func() sync.Locker) Option {
3438
}
3539
}
3640

37-
// WithSyncMutex sets the new locker function returns sync.Mutex.
41+
// WithSpinLock sets the new locker function returns spin lock.
42+
func WithSpinLock() Option {
43+
newLockerFunc := func() sync.Locker {
44+
return spinlock.New()
45+
}
46+
47+
return func(conf *config) {
48+
conf.newLockerFunc = newLockerFunc
49+
}
50+
}
51+
52+
// WithSyncMutex sets the new locker function returns sync mutex.
3853
func WithSyncMutex() Option {
3954
newLockerFunc := func() sync.Locker {
4055
return new(sync.Mutex)
@@ -44,3 +59,25 @@ func WithSyncMutex() Option {
4459
conf.newLockerFunc = newLockerFunc
4560
}
4661
}
62+
63+
// WithRoundRobinWorkers sets the new workers function using round robin strategy.
64+
func WithRoundRobinWorkers() Option {
65+
newWorkersFunc := func(workerNum int) workers {
66+
return newRoundRobinWorkers(workerNum)
67+
}
68+
69+
return func(conf *config) {
70+
conf.newWorkersFunc = newWorkersFunc
71+
}
72+
}
73+
74+
// WithRandomWorkers sets the new workers function using random strategy.
75+
func WithRandomWorkers() Option {
76+
newWorkersFunc := func(workerNum int) workers {
77+
return newRandomWorkers(workerNum)
78+
}
79+
80+
return func(conf *config) {
81+
conf.newWorkersFunc = newWorkersFunc
82+
}
83+
}

0 commit comments

Comments
 (0)