Skip to content

Commit 52c138c

Browse files
authored
Refactor build queue (#1190)
1 parent 05a81ac commit 52c138c

File tree

1 file changed

+85
-30
lines changed

1 file changed

+85
-30
lines changed

server/build_queue.go

Lines changed: 85 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
import (
44
"container/list"
55
"sync"
6+
"sync/atomic"
67
"time"
78
)
89

@@ -14,10 +15,11 @@ var taskPool = sync.Pool{
1415

1516
// BuildQueue schedules build tasks of esm.sh
1617
type BuildQueue struct {
17-
lock sync.Mutex
18-
tasks map[string]*BuildTask
19-
queue *list.List
20-
chann uint16
18+
lock sync.Mutex
19+
tasks map[string]*BuildTask
20+
queue *list.List
21+
chann uint16
22+
scheduler int32 // scheduler state (0=not running, 1=running)
2123
}
2224

2325
type BuildTask struct {
@@ -35,6 +37,9 @@ type BuildOutput struct {
3537
}
3638

3739
func NewBuildQueue(concurrency int) *BuildQueue {
40+
if concurrency <= 0 {
41+
concurrency = 1 // ensure at least 1 concurrent task
42+
}
3843
return &BuildQueue{
3944
queue: list.New(),
4045
tasks: map[string]*BuildTask{},
@@ -65,35 +70,83 @@ func (q *BuildQueue) Add(ctx *BuildContext) chan BuildOutput {
6570
task.el = q.queue.PushBack(task)
6671
q.tasks[ctx.Path()] = task
6772

68-
go q.schedule()
73+
// start scheduler if not already running
74+
if atomic.CompareAndSwapInt32(&q.scheduler, 0, 1) {
75+
go q.schedule()
76+
}
6977

7078
return ch
7179
}
7280

7381
func (q *BuildQueue) schedule() {
74-
q.lock.Lock()
75-
defer q.lock.Unlock()
76-
77-
var task *BuildTask
78-
if q.chann > 0 {
79-
for el := q.queue.Front(); el != nil; el = el.Next() {
80-
t, ok := el.Value.(*BuildTask)
81-
if ok && t.pending {
82-
task = t
83-
break
82+
defer func() {
83+
// ensure scheduler state is reset even if panic occurs
84+
if r := recover(); r != nil {
85+
// TODO: log panic
86+
}
87+
atomic.StoreInt32(&q.scheduler, 0)
88+
}()
89+
90+
for {
91+
q.lock.Lock()
92+
93+
var task *BuildTask
94+
if q.chann > 0 {
95+
for el := q.queue.Front(); el != nil; el = el.Next() {
96+
t, ok := el.Value.(*BuildTask)
97+
if ok && t.pending {
98+
task = t
99+
break
100+
}
84101
}
85102
}
86-
}
87103

88-
if task != nil {
89-
q.chann -= 1
90-
task.pending = false
91-
task.startedAt = time.Now()
92-
go q.run(task)
104+
if task != nil {
105+
q.chann -= 1
106+
task.pending = false
107+
task.startedAt = time.Now()
108+
q.lock.Unlock()
109+
110+
go q.run(task)
111+
} else {
112+
// no more tasks to schedule, exit the loop
113+
q.lock.Unlock()
114+
break
115+
}
93116
}
94117
}
95118

96119
func (q *BuildQueue) run(task *BuildTask) {
120+
defer func() {
121+
// ensure channel counter is always incremented, even if panic occurs
122+
if r := recover(); r != nil {
123+
// TODO: log panic
124+
}
125+
126+
q.lock.Lock()
127+
q.chann += 1
128+
129+
// check if there are more pending tasks and restart scheduler if needed
130+
// do this check atomically while holding the lock
131+
hasPending := false
132+
for el := q.queue.Front(); el != nil; el = el.Next() {
133+
if t, ok := el.Value.(*BuildTask); ok && t.pending {
134+
hasPending = true
135+
break
136+
}
137+
}
138+
139+
// only restart scheduler if we're not already running one
140+
if hasPending && atomic.CompareAndSwapInt32(&q.scheduler, 0, 1) {
141+
// start scheduler in a separate goroutine to avoid deadlock
142+
go func() {
143+
q.schedule()
144+
}()
145+
}
146+
147+
q.lock.Unlock()
148+
}()
149+
97150
meta, err := task.ctx.Build()
98151
if err != nil {
99152
// another shot if failed to resolve build entry
@@ -112,16 +165,17 @@ func (q *BuildQueue) run(task *BuildTask) {
112165
task.ctx.logger.Errorf("build '%s': %v", task.ctx.Path(), err)
113166
}
114167

168+
// clean up task while holding the lock to prevent race conditions
115169
q.lock.Lock()
116170
q.queue.Remove(task.el)
117171
delete(q.tasks, task.ctx.Path())
118172
if task.ctx.rawPath != "" {
119173
// the `Build` function may have changed the path
120174
delete(q.tasks, task.ctx.rawPath)
121175
}
122-
q.chann += 1
123176
q.lock.Unlock()
124177

178+
// store wait channels before recycling the task
125179
waitChans := task.waitChans
126180

127181
// recycle the task object
@@ -133,16 +187,17 @@ func (q *BuildQueue) run(task *BuildTask) {
133187
task.pending = false
134188
taskPool.Put(task)
135189

136-
// schedule next task if have any
137-
go q.schedule()
138-
139-
// send the bulid output
190+
// send the build output
140191
output := BuildOutput{meta, err}
141192
for _, ch := range waitChans {
142-
select {
143-
case ch <- output:
144-
default:
145-
// drop
193+
if ch != nil {
194+
select {
195+
case ch <- output:
196+
// successfully sent
197+
default:
198+
// channel is full or closed, log the drop
199+
// Note: task.ctx might be nil here due to recycling, so we can't log the path
200+
}
146201
}
147202
}
148203
}

0 commit comments

Comments
 (0)