Skip to content

Commit 4ec0e25

Browse files
authored
Merge pull request #7 from FishGoddess/develop
v0.2.4-alpha
2 parents bc5e0ff + f9276a6 commit 4ec0e25

17 files changed

+154
-40
lines changed

FUTURE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [ ] 支持任务数量优先调度策略
99
* [ ] 支持任务时间优先调度策略
1010
* [ ] 支持 worker 动态扩缩容
11+
* [x] 支持查询可用的 worker 数量
1112
* [x] 支持设置 worker 任务队列大小
1213
* [x] 支持设置 panic 处理函数
1314
* [x] 支持自定义 sync.Locker 实现

HISTORY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
## ✒ 历史版本的特性介绍 (Features in old versions)
22

3+
### v0.2.4-alpha
4+
5+
> 此版本发布于 2025-06-28
6+
7+
* 支持查询可用的 worker 数量
8+
* 懒启动 worker 机制,避免浪费 worker 资源
9+
310
### v0.2.3-alpha
411

512
> 此版本发布于 2025-06-28

README.en.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Limits the number of simultaneous goroutines and reuses them by Executor.
1616
* Supports multiple scheduling strategies, including round robin, random, etc.
1717
* Supports spin lock with backoff strategy.
18+
* Supports getting the number of workers available in the executor.
1819

1920
_Check [HISTORY.md](./HISTORY.md) and [FUTURE.md](./FUTURE.md) to know about more information._
2021

@@ -74,11 +75,11 @@ goarch: amd64
7475
cpu: AMD EPYC 7K62 48-Core Processor
7576

7677
BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op
77-
BenchmarkExecutor-2 23793781 49.9 ns/op 0 B/op 0 allocs/op
78+
BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op
7879
BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op
7980

8081
BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms
81-
BenchmarkExecutorTime-2: num is 1000000, cost is 51.350509ms
82+
BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms
8283
BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms
8384
```
8485

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* 支持限制同时执行的协程数,且复用协程,使用 Executor。
1616
* 支持多种调度策略,包括轮询、随机等。
1717
* 支持使用退避策略的自旋锁。
18+
* 支持查询可用的 worker 数量。
1819

1920
_历史版本的特性请查看 [HISTORY.md](./HISTORY.md)。未来版本的新特性和计划请查看 [FUTURE.md](./FUTURE.md)_
2021

@@ -74,11 +75,11 @@ goarch: amd64
7475
cpu: AMD EPYC 7K62 48-Core Processor
7576

7677
BenchmarkLimiter-2 2417040 498.5 ns/op 24 B/op 1 allocs/op
77-
BenchmarkExecutor-2 23793781 49.9 ns/op 0 B/op 0 allocs/op
78+
BenchmarkExecutor-2 20458502 58.3 ns/op 0 B/op 0 allocs/op
7879
BenchmarkAntsPool-2 4295964 271.7 ns/op 0 B/op 0 allocs/op
7980

8081
BenchmarkLimiterTime-2: num is 1000000, cost is 300.936441ms
81-
BenchmarkExecutorTime-2: num is 1000000, cost is 51.350509ms
82+
BenchmarkExecutorTime-2: num is 1000000, cost is 63.026947ms
8283
BenchmarkAntsPoolTime-2: num is 999744, cost is 346.972287ms
8384
```
8485

_icons/coverage.svg

Lines changed: 2 additions & 2 deletions
Loading

config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type config struct {
1515
workerQueueSize int
1616
recoverFunc func(r any)
1717
newLockerFunc func() sync.Locker
18-
newSchedulerFunc func(workers []*worker) scheduler
18+
newSchedulerFunc func(workers ...*worker) scheduler
1919
}
2020

2121
func newDefaultConfig(workerNum int) *config {
@@ -42,10 +42,10 @@ func (c *config) newLocker() sync.Locker {
4242
return c.newLockerFunc()
4343
}
4444

45-
func (c *config) newScheduler(workers []*worker) scheduler {
45+
func (c *config) newScheduler(workers ...*worker) scheduler {
4646
if c.newSchedulerFunc == nil {
47-
return newRoundRobinScheduler(workers)
47+
return newRoundRobinScheduler(workers...)
4848
}
4949

50-
return c.newSchedulerFunc(workers)
50+
return c.newSchedulerFunc(workers...)
5151
}

config_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,19 @@ func TestConfigNewScheduler(t *testing.T) {
6868
conf := newDefaultConfig(workerNum)
6969

7070
workers := make([]*worker, workerNum)
71-
got := conf.newScheduler(workers)
71+
got := conf.newScheduler(workers...)
7272

7373
if _, ok := got.(*roundRobinScheduler); !ok {
7474
t.Fatalf("got %T is not *roundRobinScheduler", got)
7575
}
7676

7777
want := &roundRobinScheduler{}
78-
conf.newSchedulerFunc = func(workers []*worker) scheduler {
78+
conf.newSchedulerFunc = func(workers ...*worker) scheduler {
7979
want.workers = workers
8080
return want
8181
}
8282

83-
got = conf.newScheduler(workers)
83+
got = conf.newScheduler(workers...)
8484
if fmt.Sprintf("%p", got) != fmt.Sprintf("%p", want) {
8585
t.Fatalf("got %p != want %p", got, want)
8686
}

executor.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
)
1111

1212
var (
13-
ErrExecutorClosed = errors.New("goes: executor is closed")
14-
ErrWorkerIsNil = errors.New("goes: worker is nil")
13+
ErrExecutorIsClosed = errors.New("goes: executor is closed")
14+
ErrWorkerIsNil = errors.New("goes: worker is nil")
1515
)
1616

1717
// Task is a function can be executed by executor.
@@ -45,37 +45,62 @@ func NewExecutor(workerNum int, opts ...Option) *Executor {
4545
panic("goes: worker's queue size <= 0")
4646
}
4747

48-
executor := &Executor{
49-
conf: conf,
50-
closed: false,
51-
lock: conf.newLocker(),
52-
}
53-
5448
workers := make([]*worker, 0, conf.workerNum)
55-
for range conf.workerNum {
56-
worker := newWorker(executor)
57-
workers = append(workers, worker)
49+
executor := &Executor{
50+
conf: conf,
51+
workers: workers,
52+
scheduler: conf.newScheduler(),
53+
closed: false,
54+
lock: conf.newLocker(),
5855
}
5956

60-
executor.workers = workers
61-
executor.scheduler = conf.newScheduler(workers)
57+
executor.spawnWorker()
6258
return executor
6359
}
6460

61+
func (e *Executor) spawnWorker() *worker {
62+
worker := newWorker(e)
63+
64+
e.workers = append(e.workers, worker)
65+
e.scheduler.Set(e.workers)
66+
return worker
67+
}
68+
69+
// AvailableWorkers returns the number of workers available in the executor.
70+
func (e *Executor) AvailableWorkers() int {
71+
e.lock.Lock()
72+
defer e.lock.Unlock()
73+
74+
return len(e.workers)
75+
}
76+
6577
// Submit submits a task to be handled by workers.
6678
func (e *Executor) Submit(task Task) error {
6779
e.lock.Lock()
6880
defer e.lock.Unlock()
6981

7082
if e.closed {
71-
return ErrExecutorClosed
83+
return ErrExecutorIsClosed
7284
}
7385

7486
worker := e.scheduler.Get()
7587
if worker == nil {
7688
return ErrWorkerIsNil
7789
}
7890

91+
// We don't need to create a new worker if we got a worker with no tasks.
92+
if worker.WaitingTasks() <= 0 || len(e.workers) >= e.conf.workerNum {
93+
worker.Accept(task)
94+
return nil
95+
}
96+
97+
// The number of workers has reached the limit, so we can only use the worker we got.
98+
if len(e.workers) >= e.conf.workerNum {
99+
worker.Accept(task)
100+
return nil
101+
}
102+
103+
worker = e.spawnWorker()
79104
worker.Accept(task)
80105
return nil
81106
}
@@ -94,6 +119,6 @@ func (e *Executor) Close() {
94119
worker.Done()
95120
}
96121

97-
e.wg.Wait()
122+
e.Wait()
98123
e.closed = true
99124
}

executor_test.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ func TestExecutorError(t *testing.T) {
5555
executor.Close()
5656

5757
err := executor.Submit(func() {})
58-
if err != ErrExecutorClosed {
59-
t.Fatalf("err %v != ErrExecutorClosed %v", err, ErrExecutorClosed)
58+
if err != ErrExecutorIsClosed {
59+
t.Fatalf("err %v != ErrExecutorIsClosed %v", err, ErrExecutorIsClosed)
6060
}
6161

6262
executor = NewExecutor(workerNum)
@@ -67,3 +67,52 @@ func TestExecutorError(t *testing.T) {
6767
t.Fatalf("err %v != ErrWorkerIsNil %v", err, ErrWorkerIsNil)
6868
}
6969
}
70+
71+
// go test -v -cover -run=^TestExecutorAvailableWorkers$
72+
func TestExecutorAvailableWorkers(t *testing.T) {
73+
workerNum := 16
74+
executor := NewExecutor(workerNum)
75+
76+
if len(executor.workers) != 1 {
77+
t.Fatalf("len(executor.workers) %d != 1", len(executor.workers))
78+
}
79+
80+
if executor.AvailableWorkers() != 1 {
81+
t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers())
82+
}
83+
84+
executor.workers = make([]*worker, workerNum)
85+
86+
if executor.AvailableWorkers() != workerNum {
87+
t.Fatalf("executor.AvailableWorkers() %d != workerNum %d", executor.AvailableWorkers(), workerNum)
88+
}
89+
}
90+
91+
// go test -v -cover -run=^TestExecutorSpawnWorker$
92+
func TestExecutorSpawnWorker(t *testing.T) {
93+
workerNum := 16
94+
executor := NewExecutor(workerNum)
95+
96+
if executor.AvailableWorkers() != 1 {
97+
t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers())
98+
}
99+
100+
for range workerNum {
101+
executor.Submit(func() {})
102+
time.Sleep(time.Millisecond)
103+
}
104+
105+
if executor.AvailableWorkers() != 1 {
106+
t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers())
107+
}
108+
109+
for range workerNum * 2 {
110+
executor.Submit(func() {
111+
time.Sleep(time.Millisecond)
112+
})
113+
}
114+
115+
if executor.AvailableWorkers() != workerNum {
116+
t.Fatalf("executor.AvailableWorkers() %d != workerNum %d", executor.AvailableWorkers(), workerNum)
117+
}
118+
}

option.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ func WithSyncMutex() Option {
6262

6363
// WithRoundRobinScheduler sets the new scheduler function using round robin strategy.
6464
func WithRoundRobinScheduler() Option {
65-
newSchedulerFunc := func(workers []*worker) scheduler {
66-
return newRoundRobinScheduler(workers)
65+
newSchedulerFunc := func(workers ...*worker) scheduler {
66+
return newRoundRobinScheduler(workers...)
6767
}
6868

6969
return func(conf *config) {
@@ -73,8 +73,8 @@ func WithRoundRobinScheduler() Option {
7373

7474
// WithRandomScheduler sets the new scheduler function using random strategy.
7575
func WithRandomScheduler() Option {
76-
newSchedulerFunc := func(workers []*worker) scheduler {
77-
return newRandomScheduler(workers)
76+
newSchedulerFunc := func(workers ...*worker) scheduler {
77+
return newRandomScheduler(workers...)
7878
}
7979

8080
return func(conf *config) {

0 commit comments

Comments
 (0)