Skip to content

Commit 2d69114

Browse files
committed
feat: pref pool
1 parent 088e0f1 commit 2d69114

File tree

3 files changed

+15
-68
lines changed

3 files changed

+15
-68
lines changed

pool.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@ func defaultTaskPoolConfig() *taskPoolConfig {
2020
}
2121
}
2222

23-
type task struct {
24-
Executor func()
25-
}
26-
2723
type taskPool struct {
28-
tasks chan *task
24+
tasks chan func()
2925
workers int32
3026
mu sync.RWMutex
3127
shutdown int32
@@ -49,7 +45,7 @@ func newTaskPool(config *taskPoolConfig) *taskPool {
4945

5046
ctx, cancel := context.WithCancel(context.Background())
5147
pool := &taskPool{
52-
tasks: make(chan *task, config.QueueSize),
48+
tasks: make(chan func(), config.QueueSize),
5349
workers: int32(config.Workers),
5450
workerCtx: ctx,
5551
workerCancel: cancel,
@@ -71,11 +67,11 @@ func (p *taskPool) worker() {
7167

7268
for {
7369
select {
74-
case task, ok := <-p.tasks:
70+
case fn, ok := <-p.tasks:
7571
if !ok {
7672
return
7773
}
78-
if task == nil {
74+
if fn == nil {
7975
continue
8076
}
8177

@@ -86,7 +82,7 @@ func (p *taskPool) worker() {
8682
// Prevent worker crash
8783
}
8884
}()
89-
task.Executor()
85+
fn()
9086
}()
9187

9288
case <-p.workerCtx.Done():
@@ -101,7 +97,7 @@ func (p *taskPool) Submit(executor func()) error {
10197
}
10298

10399
select {
104-
case p.tasks <- &task{Executor: executor}:
100+
case p.tasks <- executor:
105101
return nil
106102
case <-p.workerCtx.Done():
107103
return ErrManagerStopped

promise_mgr.go

Lines changed: 8 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package promise
22

33
import (
4-
"context"
54
"runtime"
65
"sync"
76
"sync/atomic"
@@ -19,7 +18,7 @@ func DefaultPromiseMgrConfig() *PromiseMgrConfig {
1918
executorWorkers := cpuCount * 2
2019
microtaskWorkers := cpuCount
2120

22-
microtaskQueueSize := 2000 + microtaskWorkers*200
21+
microtaskQueueSize := 5000 + microtaskWorkers*500
2322

2423
return &PromiseMgrConfig{
2524
ExecutorWorkers: executorWorkers,
@@ -30,15 +29,11 @@ func DefaultPromiseMgrConfig() *PromiseMgrConfig {
3029
}
3130

3231
type PromiseMgr struct {
33-
executorPool *taskPool
34-
microtaskPool *taskPool
35-
mu sync.RWMutex
36-
shutdown int32
37-
config *PromiseMgrConfig
38-
microtaskSched chan func()
39-
schedulerWg sync.WaitGroup
40-
schedulerCtx context.Context
41-
schedulerCancel context.CancelFunc
32+
executorPool *taskPool
33+
microtaskPool *taskPool
34+
mu sync.RWMutex
35+
shutdown int32
36+
config *PromiseMgrConfig
4237
}
4338

4439
func NewPromiseMgr() *PromiseMgr {
@@ -53,7 +48,6 @@ func NewPromiseMgrWithConfig(config *PromiseMgrConfig) *PromiseMgr {
5348
defaultConfig := DefaultPromiseMgrConfig()
5449
normalizeConfig(config, defaultConfig)
5550

56-
ctx, cancel := context.WithCancel(context.Background())
5751
mgr := &PromiseMgr{
5852
executorPool: newTaskPool(&taskPoolConfig{
5953
Workers: config.ExecutorWorkers,
@@ -63,14 +57,9 @@ func NewPromiseMgrWithConfig(config *PromiseMgrConfig) *PromiseMgr {
6357
Workers: config.MicrotaskWorkers,
6458
QueueSize: config.MicrotaskQueueSize,
6559
}),
66-
config: config,
67-
microtaskSched: make(chan func(), 10000),
68-
schedulerCtx: ctx,
69-
schedulerCancel: cancel,
60+
config: config,
7061
}
7162

72-
mgr.startMicrotaskScheduler()
73-
7463
return mgr
7564
}
7665

@@ -99,45 +88,12 @@ func (m *PromiseMgr) scheduleExecutor(executor func()) error {
9988
return m.executorPool.Submit(executor)
10089
}
10190

102-
func (m *PromiseMgr) startMicrotaskScheduler() {
103-
m.schedulerWg.Add(1)
104-
go func() {
105-
defer m.schedulerWg.Done()
106-
for {
107-
select {
108-
case fn := <-m.microtaskSched:
109-
if fn == nil {
110-
return
111-
}
112-
_ = m.microtaskPool.Submit(fn)
113-
case <-m.schedulerCtx.Done():
114-
return
115-
}
116-
}
117-
}()
118-
}
119-
12091
func (m *PromiseMgr) scheduleMicrotask(fn func()) error {
12192
if atomic.LoadInt32(&m.shutdown) == 1 {
12293
return ErrManagerStopped
12394
}
12495

125-
select {
126-
case m.microtaskSched <- fn:
127-
return nil
128-
case <-m.schedulerCtx.Done():
129-
return ErrManagerStopped
130-
default:
131-
go func() {
132-
defer func() {
133-
if r := recover(); r != nil {
134-
// Prevent goroutine crash
135-
}
136-
}()
137-
fn()
138-
}()
139-
return nil
140-
}
96+
return m.microtaskPool.Submit(fn)
14197
}
14298

14399
// SetMicrotaskConfig updates microtask configuration
@@ -211,10 +167,6 @@ func (m *PromiseMgr) Close() {
211167
m.mu.Lock()
212168
defer m.mu.Unlock()
213169

214-
m.schedulerCancel()
215-
close(m.microtaskSched)
216-
m.schedulerWg.Wait()
217-
218170
m.executorPool.Close()
219171
m.microtaskPool.Close()
220172
}
@@ -235,7 +187,6 @@ func (m *PromiseMgr) IsShutdown() bool {
235187

236188
// WaitForShutdown waits for shutdown completion
237189
func (m *PromiseMgr) WaitForShutdown() {
238-
m.schedulerWg.Wait()
239190
m.executorPool.WaitForShutdown()
240191
m.microtaskPool.WaitForShutdown()
241192
}

promise_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ func TestRejectFunction(t *testing.T) {
491491
func TestMicrotaskConfig(t *testing.T) {
492492
// Test default config
493493
defaultConfig := DefaultPromiseMgrConfig()
494-
expectedQueueSize := 2000 + runtime.NumCPU()*200
494+
expectedQueueSize := 5000 + runtime.NumCPU()*500
495495
if defaultConfig.MicrotaskQueueSize != expectedQueueSize {
496496
t.Errorf("Expected default MicrotaskQueueSize %d, got %d", expectedQueueSize, defaultConfig.MicrotaskQueueSize)
497497
}

0 commit comments

Comments
 (0)