Skip to content

Commit 617cddc

Browse files
committed
handle delay pending task on start
1 parent 47db511 commit 617cddc

File tree

1 file changed

+15
-16
lines changed

1 file changed

+15
-16
lines changed

manager/task-manager.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"container/heap"
55
"encoding/json"
66
"fmt"
7-
"log"
87
"sync"
98
"time"
109

@@ -96,7 +95,7 @@ func (tm *TaskManager) AddNewTask(task *model.Task) {
9695
Time: delayTime,
9796
})
9897
} else {
99-
go tm.assignTask(task, true)
98+
go tm.assignTask(task)
10099
}
101100
}
102101
}
@@ -125,7 +124,7 @@ func (tm *TaskManager) receiveNewTask() {
125124
Time: delayTime,
126125
})
127126
} else {
128-
go tm.assignTask(task, true)
127+
go tm.assignTask(task)
129128
}
130129
}
131130
}
@@ -187,18 +186,9 @@ func (tm *TaskManager) receiveServerJoinMessage() {
187186
}
188187
}
189188

190-
func (tm *TaskManager) assignTask(task *model.Task, isNewTask bool, oldTaskId ...string) {
189+
func (tm *TaskManager) assignTask(task *model.Task) {
191190
var id string = ""
192-
var err error = nil
193-
if isNewTask {
194-
id = task.Id
195-
} else if len(oldTaskId) > 0 {
196-
id = oldTaskId[0]
197-
}
198-
if err != nil {
199-
log.Printf("error saving task %v\n", err)
200-
return
201-
}
191+
id = task.Id
202192
taskWeight, ok := tm.tasksWeight[task.Meta.TaskType]
203193
var minServer *model.Servers
204194
minLoadVal := 10000000
@@ -228,7 +218,7 @@ func (tm *TaskManager) delayTaskTicker() {
228218
if taskI != nil {
229219
task := taskI.(*DelayTask)
230220
if task.Time-time.Now().Unix() <= 0 {
231-
go tm.assignTask(task.Task, true)
221+
go tm.assignTask(task.Task)
232222
} else {
233223
tm.priorityQueue.Push(task)
234224
}
@@ -261,8 +251,17 @@ func (tm *TaskManager) assignPendingTasks() {
261251
for _, pendingTask := range pendingTasks {
262252
var task = model.Task{
263253
Meta: pendingTask.Meta,
254+
Id: pendingTask.Id,
255+
}
256+
if pendingTask.Meta.Delay > 0 {
257+
delayTime := time.Now().Unix() + int64(task.Meta.Delay)*60
258+
tm.priorityQueue.Push(&DelayTask{
259+
Task: &task,
260+
Time: delayTime,
261+
})
262+
} else {
263+
tm.assignTask(&task)
264264
}
265-
tm.assignTask(&task, false, pendingTask.Id)
266265
}
267266
}
268267
}

0 commit comments

Comments
 (0)