Skip to content

Commit 5fe495c

Browse files
authored
Merge pull request #2826 from semaphoreui/fix_tasks_order
fix(be): tasks order when max parallel tasks provided
2 parents 8fdc8f5 + 0ff128f commit 5fe495c

File tree

3 files changed

+114
-65
lines changed

3 files changed

+114
-65
lines changed

services/tasks/TaskPool.go

Lines changed: 86 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/semaphoreui/semaphore/pkg/tz"
88
"github.com/semaphoreui/semaphore/services/tasks/stage_parsers"
99
"regexp"
10+
"slices"
1011
"strconv"
1112
"strings"
1213
"time"
@@ -25,9 +26,18 @@ type logRecord struct {
2526
time time.Time
2627
}
2728

28-
type resourceLock struct {
29-
lock bool
30-
holder *TaskRunner
29+
type EventType uint
30+
31+
const (
32+
EventTypeNew EventType = 0
33+
EventTypeFinished EventType = 1
34+
EventTypeFailed EventType = 2
35+
EventTypeEmpty EventType = 3
36+
)
37+
38+
type PoolEvent struct {
39+
eventType EventType
40+
task *TaskRunner
3141
}
3242

3343
type TaskPool struct {
@@ -48,7 +58,7 @@ type TaskPool struct {
4858

4959
store db.Store
5060

51-
resourceLocker chan *resourceLock
61+
queueEvents chan PoolEvent
5262

5363
aliases map[string]*TaskRunner
5464
}
@@ -214,41 +224,10 @@ func (p *TaskPool) Run() {
214224
ticker := time.NewTicker(5 * time.Second)
215225

216226
defer func() {
217-
close(p.resourceLocker)
218227
ticker.Stop()
219228
}()
220229

221-
// Lock or unlock resources when running a TaskRunner
222-
go func(locker <-chan *resourceLock) {
223-
for l := range locker {
224-
t := l.holder
225-
226-
if l.lock {
227-
if p.blocks(t) {
228-
panic("Trying to lock an already locked resource!")
229-
}
230-
231-
projTasks, ok := p.activeProj[t.Task.ProjectID]
232-
if !ok {
233-
projTasks = make(map[int]*TaskRunner)
234-
p.activeProj[t.Task.ProjectID] = projTasks
235-
}
236-
projTasks[t.Task.ID] = t
237-
p.RunningTasks[t.Task.ID] = t
238-
continue
239-
}
240-
241-
if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil {
242-
delete(p.activeProj[t.Task.ProjectID], t.Task.ID)
243-
if len(p.activeProj[t.Task.ProjectID]) == 0 {
244-
delete(p.activeProj, t.Task.ProjectID)
245-
}
246-
}
247-
248-
delete(p.RunningTasks, t.Task.ID)
249-
delete(p.aliases, t.Alias)
250-
}
251-
}(p.resourceLocker)
230+
go p.handleQueue()
252231

253232
for {
254233
select {
@@ -290,43 +269,87 @@ func (p *TaskPool) Run() {
290269
case task := <-p.register: // new task created by API or schedule
291270

292271
db.StoreSession(p.store, "new task", func() {
293-
p.Queue = append(p.Queue, task)
272+
//p.Queue = append(p.Queue, task)
294273
log.Debug(task)
295274
msg := "Task " + strconv.Itoa(task.Task.ID) + " added to queue"
296275
task.Log(msg)
297276
log.Info(msg)
298277
task.saveStatus()
299278
})
279+
p.queueEvents <- PoolEvent{EventTypeNew, task}
300280

301281
case <-ticker.C: // timer 5 seconds
302-
if len(p.Queue) == 0 {
303-
break
304-
}
282+
p.queueEvents <- PoolEvent{EventTypeEmpty, nil}
283+
284+
}
285+
}
286+
}
305287

306-
//get TaskRunner from top of queue
307-
t := p.Queue[0]
308-
if t.Task.Status == task_logger.TaskFailStatus {
288+
func (p *TaskPool) handleQueue() {
289+
for t := range p.queueEvents {
290+
switch t.eventType {
291+
case EventTypeNew:
292+
p.Queue = append(p.Queue, t.task)
293+
case EventTypeFinished:
294+
p.onTaskStop(t.task)
295+
}
296+
297+
if len(p.Queue) == 0 {
298+
continue
299+
}
300+
301+
var i = 0
302+
for i < len(p.Queue) {
303+
curr := p.Queue[i]
304+
305+
if curr.Task.Status == task_logger.TaskFailStatus {
309306
//delete failed TaskRunner from queue
310-
p.Queue = p.Queue[1:]
311-
log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue")
312-
break
307+
p.Queue = slices.Delete(p.Queue, i, i+1)
308+
log.Info("Task " + strconv.Itoa(curr.Task.ID) + " removed from queue")
309+
continue
313310
}
314311

315-
if p.blocks(t) {
316-
//move blocked TaskRunner to end of queue
317-
p.Queue = append(p.Queue[1:], t)
318-
break
312+
if p.blocks(curr) {
313+
i = i + 1
314+
continue
319315
}
320316

321-
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
322-
p.resourceLocker <- &resourceLock{lock: true, holder: t}
317+
p.Queue = slices.Delete(p.Queue, i, i+1)
318+
runTask(curr, p)
319+
}
320+
}
321+
}
322+
323+
func runTask(task *TaskRunner, p *TaskPool) {
324+
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(task.Task.ID))
325+
326+
p.onTaskRun(task)
323327

324-
go t.run()
328+
log.Info("Task " + strconv.Itoa(task.Task.ID) + " started")
329+
go task.run()
330+
}
325331

326-
p.Queue = p.Queue[1:]
327-
log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue")
332+
func (p *TaskPool) onTaskRun(t *TaskRunner) {
333+
projTasks, ok := p.activeProj[t.Task.ProjectID]
334+
if !ok {
335+
projTasks = make(map[int]*TaskRunner)
336+
p.activeProj[t.Task.ProjectID] = projTasks
337+
}
338+
projTasks[t.Task.ID] = t
339+
p.RunningTasks[t.Task.ID] = t
340+
p.aliases[t.Alias] = t
341+
}
342+
343+
func (p *TaskPool) onTaskStop(t *TaskRunner) {
344+
if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil {
345+
delete(p.activeProj[t.Task.ProjectID], t.Task.ID)
346+
if len(p.activeProj[t.Task.ProjectID]) == 0 {
347+
delete(p.activeProj, t.Task.ProjectID)
328348
}
329349
}
350+
351+
delete(p.RunningTasks, t.Task.ID)
352+
delete(p.aliases, t.Alias)
330353
}
331354

332355
func (p *TaskPool) blocks(t *TaskRunner) bool {
@@ -366,14 +389,14 @@ func (p *TaskPool) blocks(t *TaskRunner) bool {
366389

367390
func CreateTaskPool(store db.Store) TaskPool {
368391
return TaskPool{
369-
Queue: make([]*TaskRunner, 0), // queue of waiting tasks
370-
register: make(chan *TaskRunner), // add TaskRunner to queue
371-
activeProj: make(map[int]map[int]*TaskRunner),
372-
RunningTasks: make(map[int]*TaskRunner), // working tasks
373-
logger: make(chan logRecord, 10000), // store log records to database
374-
store: store,
375-
resourceLocker: make(chan *resourceLock),
376-
aliases: make(map[string]*TaskRunner),
392+
Queue: make([]*TaskRunner, 0), // queue of waiting tasks
393+
register: make(chan *TaskRunner), // add TaskRunner to queue
394+
activeProj: make(map[int]map[int]*TaskRunner),
395+
RunningTasks: make(map[int]*TaskRunner), // working tasks
396+
logger: make(chan logRecord, 10000), // store log records to database
397+
store: store,
398+
queueEvents: make(chan PoolEvent),
399+
aliases: make(map[string]*TaskRunner),
377400
}
378401
}
379402

@@ -536,7 +559,6 @@ func (p *TaskPool) AddTask(
536559

537560
if needAlias {
538561
taskRunner.Alias = random.String(32)
539-
p.aliases[taskRunner.Alias] = &taskRunner
540562
}
541563

542564
err = taskRunner.populateDetails()

services/tasks/TaskRunner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@ func (t *TaskRunner) run() {
149149
defer func() {
150150
log.Info("Stopped running TaskRunner " + strconv.Itoa(t.Task.ID))
151151
log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
152-
t.pool.resourceLocker <- &resourceLock{lock: false, holder: t}
153152

154153
now := tz.Now()
155154
t.Task.End = &now
156155
t.saveStatus()
157156
t.createTaskEvent()
157+
t.pool.queueEvents <- PoolEvent{EventTypeFinished, t}
158158
}()
159159

160160
// Mark task as stopped if user stopped task during preparation (before task run).

util/debug.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package util
2+
3+
import (
4+
log "github.com/sirupsen/logrus"
5+
"runtime"
6+
"strconv"
7+
"strings"
8+
)
9+
10+
func Goid() (int, error) {
11+
var buf [64]byte
12+
n := runtime.Stack(buf[:], false)
13+
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
14+
id, err := strconv.Atoi(idField)
15+
if err != nil {
16+
log.Debug("Cannot get goroutine id: ", err)
17+
return -1, err
18+
}
19+
return id, nil
20+
}
21+
22+
func LogGoid(msg string) {
23+
id, err := Goid()
24+
if err == nil {
25+
log.Info(msg, ", goid=", id)
26+
}
27+
}

0 commit comments

Comments
 (0)