Skip to content

Commit ec5c1d2

Browse files
committed
chore: add some test for routine
1 parent 17b98ef commit ec5c1d2

File tree

6 files changed

+87
-46
lines changed

6 files changed

+87
-46
lines changed

vermeer/apps/master/bl/scheduler_bl.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *ScheduleBl) Init() {
5959
s.algorithmManager = &schedules.SchedulerAlgorithmManager{}
6060
s.algorithmManager.Init()
6161
s.cronManager = &schedules.SchedulerCronManager{}
62-
s.cronManager.Init(s.QueueTask)
62+
s.cronManager.Init(s.QueueTaskFromTemplate)
6363
go s.startTicker()
6464
go s.waitingStartedTask()
6565
}
@@ -186,7 +186,7 @@ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
186186
return false, errors.New("the property `SpaceName` of taskInfo is empty")
187187
}
188188

189-
//defer s.Unlock(s.Lock())
189+
defer s.Unlock(s.Lock())
190190
if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err != nil {
191191
return false, err
192192
}
@@ -212,6 +212,30 @@ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
212212
return ok, nil
213213
}
214214

215+
func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) (int32, error) {
216+
if template == nil {
217+
return -1, errors.New("the argument `template` is nil")
218+
}
219+
220+
bc := &baseCreator{}
221+
taskInfo, err := bc.CopyTaskInfo(template)
222+
if err != nil {
223+
logrus.Errorf("failed to copy task info from template, template ID: %d, caused by: %v", template.ID, err)
224+
return -1, err
225+
}
226+
bc.saveTaskInfo(taskInfo)
227+
228+
ok, err := s.QueueTask(taskInfo)
229+
if err != nil || !ok {
230+
logrus.Errorf("failed to queue task from template, template ID: %d, caused by: %v", template.ID, err)
231+
return -1, err
232+
}
233+
234+
logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, template.ID)
235+
236+
return taskInfo.ID, nil
237+
}
238+
215239
func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []error) {
216240
if len(taskInfos) == 0 {
217241
return []bool{}, []error{}
@@ -220,7 +244,7 @@ func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []
220244
s.PauseDispatch()
221245

222246
defer s.ResumeDispatch()
223-
defer s.Unlock(s.Lock())
247+
// defer s.Unlock(s.Lock())
224248

225249
errors := make([]error, len(taskInfos))
226250
oks := make([]bool, len(taskInfos))
@@ -246,7 +270,7 @@ func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) erro
246270
s.resourceManager.ReleaseByTaskID(taskId)
247271

248272
if len(removeWorkerName) > 0 {
249-
// stop the cron job if exists
273+
// stop the cron job if exists when need remove worker, otherwise the task is just closed normally
250274
s.cronManager.DeleteTask(taskId)
251275
// remove the worker from resource manager
252276
workerName := removeWorkerName[0]

vermeer/apps/master/bl/task_bl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func (tb *TaskBl) CreateTaskInfo(
6868
taskInfo.Priority = 0
6969
taskInfo.Preorders = make([]int32, 0)
7070
taskInfo.Exclusive = false // default to false, can be set to true if needed
71+
// taskInfo.Exclusive = true
7172
if params != nil {
7273
if priority, ok := params["priority"]; ok {
7374
if p, err := strconv.Atoi(priority); err == nil {

vermeer/apps/master/bl/task_creator.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,28 @@ func (bc *baseCreator) NewTaskInfo(graphName string, params map[string]string, t
9999
return task, nil
100100
}
101101

102+
func (bc *baseCreator) CopyTaskInfo(src *structure.TaskInfo) (*structure.TaskInfo, error) {
103+
if src == nil {
104+
return nil, fmt.Errorf("the argument `src` should not be nil")
105+
}
106+
107+
task, err := taskMgr.CreateTask(src.SpaceName, src.Type, 0)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
task.CreateType = structure.TaskCreateAsync
113+
task.GraphName = src.GraphName
114+
task.CreateUser = src.CreateUser
115+
task.Params = src.Params
116+
task.CronExpr = "" // clear cron expression for the new task
117+
task.Priority = src.Priority
118+
task.Preorders = src.Preorders
119+
task.Exclusive = src.Exclusive
120+
121+
return task, nil
122+
}
123+
102124
func (bc *baseCreator) saveTaskInfo(task *structure.TaskInfo) (*structure.TaskInfo, error) {
103125
if _, err := taskMgr.AddTask(task); err != nil {
104126
logrus.Errorf("failed to add a task to `TaskManager`, task: %v, cased by: %v", task, err)

vermeer/apps/master/schedules/scheduler_cron_manager.go

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
type SchedulerCronManager struct {
1212
cronTasks map[int32][]*structure.TaskInfo // cron expression to TaskInfo. Origin task ID to copied tasks
1313
crons map[int32][]*cron.Cron // cron expression to cron jobs
14-
// queueHandler is a function that handles the task queue
15-
queueHandler func(*structure.TaskInfo) (bool, error)
14+
// queueTemplateHandler is a function that handles the task queue
15+
queueTemplateHandler func(*structure.TaskInfo) (int32, error)
1616
}
1717

18-
func (t *SchedulerCronManager) Init(queueHandler func(*structure.TaskInfo) (bool, error)) *SchedulerCronManager {
18+
func (t *SchedulerCronManager) Init(queueTemplateHandler func(*structure.TaskInfo) (int32, error)) *SchedulerCronManager {
1919
t.cronTasks = make(map[int32][]*structure.TaskInfo)
2020
t.crons = make(map[int32][]*cron.Cron)
21-
t.queueHandler = queueHandler
21+
t.queueTemplateHandler = queueTemplateHandler
2222
return t
2323
}
2424

@@ -50,28 +50,14 @@ func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
5050
return
5151
}
5252

53-
// TODO: CREATE a new task from the original task, using taskbl
53+
// CREATE a new task from the original task, using taskbl, it is handled in queueTemplateHandler
5454
// copy a new taskInfo
55-
task, err := structure.TaskManager.CreateTask(taskInfo.SpaceName, taskInfo.Type, 0)
56-
task.CreateType = structure.TaskCreateAsync
57-
task.GraphName = taskInfo.GraphName
58-
task.CreateUser = taskInfo.CreateUser
59-
task.Params = taskInfo.Params
60-
task.CronExpr = "" // clear cron expression for the new task
61-
task.Priority = taskInfo.Priority
62-
task.Preorders = taskInfo.Preorders
63-
task.Exclusive = taskInfo.Exclusive
55+
newID, err := t.queueTemplateHandler(taskInfo)
6456
if err != nil {
65-
logrus.Errorf("Failed to create task from cron job for task %d: %v", taskInfo.ID, err)
66-
return
67-
}
68-
structure.TaskManager.AddTask(task)
69-
structure.TaskManager.SaveTask(task.ID)
70-
if _, err := t.queueHandler(task); err != nil {
7157
logrus.Errorf("Failed to queue task %d in cron job: %v", taskInfo.ID, err)
7258
return
7359
}
74-
logrus.Infof("Successfully queued task %d from cron job", task.ID)
60+
logrus.Infof("Successfully queued task %d from cron job", newID)
7561
})
7662
if err != nil {
7763
logrus.Errorf("Failed to add cron job for task %d: %v", taskInfo.ID, err)

vermeer/test/scheduler/priority.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,34 +146,34 @@ func SubTestDepends(t *testing.T, expectRes *functional.ExpectRes, healthCheck *
146146
func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) {
147147
fmt.Print("start test priority\n")
148148

149-
// // for scheduler, just test a simple task
150-
// var computeTask = "pagerank"
149+
// for scheduler, just test a simple task
150+
var computeTask = "pagerank"
151151

152-
// // TEST GROUP: PRIORITY
153-
// // 1. send priority tasks to single graph
154-
// // expect: the tasks should be executed in order of priority
152+
// TEST GROUP: PRIORITY
153+
// 1. send priority tasks to single graph
154+
// expect: the tasks should be executed in order of priority
155155

156-
// SubTestPriority(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
156+
SubTestPriority(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
157157

158-
// // 2. send small tasks and large tasks to single graph
159-
// // expect: the small tasks should be executed first
158+
// 2. send small tasks and large tasks to single graph
159+
// expect: the small tasks should be executed first
160160

161-
// SubTestSmall(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
161+
SubTestSmall(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
162162

163-
// // 3. send support concurrent tasks to single graph
164-
// // expect: the tasks should be executed concurrently
165-
// SubTestConcurrent(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
163+
// 3. send support concurrent tasks to single graph
164+
// expect: the tasks should be executed concurrently
165+
SubTestConcurrent(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
166166

167-
// // 4. send dependency-tasks to single graph
168-
// // expect: the tasks should be executed in order of dependency
167+
// 4. send dependency-tasks to single graph
168+
// expect: the tasks should be executed in order of dependency
169169

170-
// SubTestDepends(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
170+
SubTestDepends(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
171171

172-
// // 5. send same priority tasks to single graph
173-
// // expect: the tasks should be executed in order of time
174-
// // skipped, too fragile
172+
// 5. send same priority tasks to single graph
173+
// expect: the tasks should be executed in order of time
174+
// skipped, too fragile
175175

176-
// // 6. send tasks to different graphs
177-
// // expect: the tasks should be executed concurrently
178-
// // have been tested in SubTestSmall and SubTestDepends
176+
// 6. send tasks to different graphs
177+
// expect: the tasks should be executed concurrently
178+
// have been tested in SubTestSmall and SubTestDepends
179179
}

vermeer/test/scheduler/routine.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ func SubTestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck *
3030
// wait for a while and check again
3131
time.Sleep(2 * time.Minute)
3232

33+
// check if deployed
34+
queue := []int32{}
35+
queue = append(queue, int32(taskid+1))
36+
result, err := masterHttp.GetTaskStartSequence(queue)
37+
require.NoError(t, err)
38+
require.Equal(t, 1, len(result.Sequence))
39+
require.Greater(t, result.Sequence[0], int32(0))
40+
3341
masterHttp.GetTaskCancel(int(taskid))
3442

3543
fmt.Printf("Test Routine: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime))

0 commit comments

Comments
 (0)