Skip to content

Commit 0ff128f

Browse files
committed
Task pool refactoring
1 parent f2ce5db commit 0ff128f

File tree

3 files changed

+112
-88
lines changed

3 files changed

+112
-88
lines changed

services/tasks/TaskPool.go

Lines changed: 84 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,18 @@ type logRecord struct {
2424
time time.Time
2525
}
2626

27-
type resourceLock struct {
28-
lock bool
29-
holder *TaskRunner
27+
type EventType uint
28+
29+
const (
30+
EventTypeNew EventType = 0
31+
EventTypeFinished EventType = 1
32+
EventTypeFailed EventType = 2
33+
EventTypeEmpty EventType = 3
34+
)
35+
36+
type PoolEvent struct {
37+
eventType EventType
38+
task *TaskRunner
3039
}
3140

3241
type TaskPool struct {
@@ -47,7 +56,7 @@ type TaskPool struct {
4756

4857
store db.Store
4958

50-
resourceLocker chan *resourceLock
59+
queueEvents chan PoolEvent
5160

5261
aliases map[string]*TaskRunner
5362
}
@@ -100,41 +109,10 @@ func (p *TaskPool) Run() {
100109
ticker := time.NewTicker(5 * time.Second)
101110

102111
defer func() {
103-
close(p.resourceLocker)
104112
ticker.Stop()
105113
}()
106114

107-
// Lock or unlock resources when running a TaskRunner
108-
go func(locker <-chan *resourceLock) {
109-
for l := range locker {
110-
t := l.holder
111-
112-
if l.lock {
113-
if p.blocks(t) {
114-
panic("Trying to lock an already locked resource!")
115-
}
116-
117-
projTasks, ok := p.activeProj[t.Task.ProjectID]
118-
if !ok {
119-
projTasks = make(map[int]*TaskRunner)
120-
p.activeProj[t.Task.ProjectID] = projTasks
121-
}
122-
projTasks[t.Task.ID] = t
123-
p.RunningTasks[t.Task.ID] = t
124-
continue
125-
}
126-
127-
if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil {
128-
delete(p.activeProj[t.Task.ProjectID], t.Task.ID)
129-
if len(p.activeProj[t.Task.ProjectID]) == 0 {
130-
delete(p.activeProj, t.Task.ProjectID)
131-
}
132-
}
133-
134-
delete(p.RunningTasks, t.Task.ID)
135-
delete(p.aliases, t.Alias)
136-
}
137-
}(p.resourceLocker)
115+
go p.handleQueue()
138116

139117
for {
140118
select {
@@ -153,67 +131,87 @@ func (p *TaskPool) Run() {
153131
case task := <-p.register: // new task created by API or schedule
154132

155133
db.StoreSession(p.store, "new task", func() {
156-
p.Queue = append(p.Queue, task)
134+
//p.Queue = append(p.Queue, task)
157135
log.Debug(task)
158136
msg := "Task " + strconv.Itoa(task.Task.ID) + " added to queue"
159137
task.Log(msg)
160138
log.Info(msg)
161139
task.saveStatus()
162140
})
141+
p.queueEvents <- PoolEvent{EventTypeNew, task}
163142

164143
case <-ticker.C: // timer 5 seconds
165-
if len(p.Queue) == 0 {
166-
break
167-
}
144+
p.queueEvents <- PoolEvent{EventTypeEmpty, nil}
168145

169-
var t *TaskRunner
146+
}
147+
}
148+
}
170149

171-
for i := range p.Queue {
172-
curr := p.Queue[i]
150+
func (p *TaskPool) handleQueue() {
151+
for t := range p.queueEvents {
152+
switch t.eventType {
153+
case EventTypeNew:
154+
p.Queue = append(p.Queue, t.task)
155+
case EventTypeFinished:
156+
p.onTaskStop(t.task)
157+
}
173158

174-
if curr.Task.Status == task_logger.TaskFailStatus {
175-
//delete failed TaskRunner from queue
176-
p.Queue = slices.Delete(p.Queue, i, i+1)
177-
log.Info("Task " + strconv.Itoa(curr.Task.ID) + " removed from queue")
178-
continue
179-
}
159+
if len(p.Queue) == 0 {
160+
continue
161+
}
180162

181-
if p.blocks(curr) {
182-
continue
183-
}
163+
var i = 0
164+
for i < len(p.Queue) {
165+
curr := p.Queue[i]
184166

185-
t = curr
186-
break
167+
if curr.Task.Status == task_logger.TaskFailStatus {
168+
//delete failed TaskRunner from queue
169+
p.Queue = slices.Delete(p.Queue, i, i+1)
170+
log.Info("Task " + strconv.Itoa(curr.Task.ID) + " removed from queue")
171+
continue
187172
}
188173

189-
if t == nil {
190-
break
174+
if p.blocks(curr) {
175+
i = i + 1
176+
continue
191177
}
192178

193-
////get TaskRunner from top of queue
194-
//t := p.Queue[0]
195-
//if t.Task.Status == task_logger.TaskFailStatus {
196-
// //delete failed TaskRunner from queue
197-
// p.Queue = p.Queue[1:]
198-
// log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue")
199-
// break
200-
//}
201-
//
202-
//if p.blocks(t) {
203-
// //move blocked TaskRunner to end of queue
204-
// //p.Queue = append(p.Queue[1:], t)
205-
// break
206-
//}
207-
208-
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
209-
p.resourceLocker <- &resourceLock{lock: true, holder: t}
210-
211-
go t.run()
212-
213-
p.Queue = p.Queue[1:]
214-
log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue")
179+
p.Queue = slices.Delete(p.Queue, i, i+1)
180+
runTask(curr, p)
181+
}
182+
}
183+
}
184+
185+
func runTask(task *TaskRunner, p *TaskPool) {
186+
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(task.Task.ID))
187+
188+
p.onTaskRun(task)
189+
190+
log.Info("Task " + strconv.Itoa(task.Task.ID) + " started")
191+
go task.run()
192+
}
193+
194+
func (p *TaskPool) onTaskRun(t *TaskRunner) {
195+
projTasks, ok := p.activeProj[t.Task.ProjectID]
196+
if !ok {
197+
projTasks = make(map[int]*TaskRunner)
198+
p.activeProj[t.Task.ProjectID] = projTasks
199+
}
200+
projTasks[t.Task.ID] = t
201+
p.RunningTasks[t.Task.ID] = t
202+
p.aliases[t.Alias] = t
203+
}
204+
205+
func (p *TaskPool) onTaskStop(t *TaskRunner) {
206+
if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil {
207+
delete(p.activeProj[t.Task.ProjectID], t.Task.ID)
208+
if len(p.activeProj[t.Task.ProjectID]) == 0 {
209+
delete(p.activeProj, t.Task.ProjectID)
215210
}
216211
}
212+
213+
delete(p.RunningTasks, t.Task.ID)
214+
delete(p.aliases, t.Alias)
217215
}
218216

219217
func (p *TaskPool) blocks(t *TaskRunner) bool {
@@ -247,14 +245,14 @@ func (p *TaskPool) blocks(t *TaskRunner) bool {
247245

248246
func CreateTaskPool(store db.Store) TaskPool {
249247
return TaskPool{
250-
Queue: make([]*TaskRunner, 0), // queue of waiting tasks
251-
register: make(chan *TaskRunner), // add TaskRunner to queue
252-
activeProj: make(map[int]map[int]*TaskRunner),
253-
RunningTasks: make(map[int]*TaskRunner), // working tasks
254-
logger: make(chan logRecord, 10000), // store log records to database
255-
store: store,
256-
resourceLocker: make(chan *resourceLock),
257-
aliases: make(map[string]*TaskRunner),
248+
Queue: make([]*TaskRunner, 0), // queue of waiting tasks
249+
register: make(chan *TaskRunner), // add TaskRunner to queue
250+
activeProj: make(map[int]map[int]*TaskRunner),
251+
RunningTasks: make(map[int]*TaskRunner), // working tasks
252+
logger: make(chan logRecord, 10000), // store log records to database
253+
store: store,
254+
queueEvents: make(chan PoolEvent),
255+
aliases: make(map[string]*TaskRunner),
258256
}
259257
}
260258

@@ -410,7 +408,6 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int, needAlia
410408

411409
if needAlias {
412410
taskRunner.Alias = random.String(32)
413-
p.aliases[taskRunner.Alias] = &taskRunner
414411
}
415412

416413
err = taskRunner.populateDetails()

services/tasks/TaskRunner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ func (t *TaskRunner) run() {
109109
defer func() {
110110
log.Info("Stopped running TaskRunner " + strconv.Itoa(t.Task.ID))
111111
log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
112-
t.pool.resourceLocker <- &resourceLock{lock: false, holder: t}
113112

114113
now := time.Now()
115114
t.Task.End = &now
116115
t.saveStatus()
117116
t.createTaskEvent()
117+
t.pool.queueEvents <- PoolEvent{EventTypeFinished, t}
118118
}()
119119

120120
// Mark task as stopped if user stopped task during preparation (before task run).

util/debug.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package util
2+
3+
import (
4+
log "github.com/sirupsen/logrus"
5+
"runtime"
6+
"strconv"
7+
"strings"
8+
)
9+
10+
func Goid() (int, error) {
11+
var buf [64]byte
12+
n := runtime.Stack(buf[:], false)
13+
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
14+
id, err := strconv.Atoi(idField)
15+
if err != nil {
16+
log.Debug("Cannot get goroutine id: ", err)
17+
return -1, err
18+
}
19+
return id, nil
20+
}
21+
22+
func LogGoid(msg string) {
23+
id, err := Goid()
24+
if err == nil {
25+
log.Info(msg, ", goid=", id)
26+
}
27+
}

0 commit comments

Comments
 (0)