Skip to content

Commit 17b98ef

Browse files
committed
chore: add some test for routine
1 parent aca2994 commit 17b98ef

File tree

5 files changed

+77
-8
lines changed

5 files changed

+77
-8
lines changed

vermeer/apps/master/bl/scheduler_bl.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,11 @@ func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) erro
244244
s.taskManager.RemoveTask(taskId)
245245
// release the worker group
246246
s.resourceManager.ReleaseByTaskID(taskId)
247-
// stop the cron job if exists
248-
s.cronManager.DeleteTask(taskId)
249247

250248
if len(removeWorkerName) > 0 {
249+
// stop the cron job if exists
250+
s.cronManager.DeleteTask(taskId)
251+
// remove the worker from resource manager
251252
workerName := removeWorkerName[0]
252253
if workerName == "" {
253254
return errors.New("the argument `removeWorkerName` is empty")
@@ -408,6 +409,16 @@ func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
408409
return nil
409410
}
410411

412+
func (s *ScheduleBl) CancelCronTask(taskInfo *structure.TaskInfo) error {
413+
if taskInfo == nil {
414+
return errors.New("the argument `taskInfo` is nil")
415+
}
416+
417+
s.cronManager.DeleteTask(taskInfo.ID)
418+
419+
return nil
420+
}
421+
411422
// ** Other Methods **
412423

413424
func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {

vermeer/apps/master/bl/task_bl.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func (tb *TaskBl) CancelTask(taskID int32) error {
182182
return fmt.Errorf("cannot cancel the task with id '%v' as it was not created by you", taskID)
183183
}
184184

185+
// stop the cron job if exists
186+
Scheduler.CancelCronTask(task)
187+
185188
if task.State == structure.TaskStateCanceled {
186189
return fmt.Errorf("task had been in state canceled")
187190
}

vermeer/apps/master/schedules/scheduler_cron_manager.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error {
2727
return errors.New("cron expression is empty")
2828
}
2929
if _, err := cron.ParseStandard(cronExpr); err != nil {
30+
logrus.Errorf("Failed to parse cron expression: %v", err)
3031
return errors.New("invalid cron expression: " + err.Error())
3132
}
3233
return nil
@@ -41,22 +42,44 @@ func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
4142
return errors.New("the property `CronExpr` of taskInfo is empty")
4243
}
4344

45+
// add to cron tasks
4446
t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo)
4547
cronJob := cron.New()
4648
_, err := cronJob.AddFunc(taskInfo.CronExpr, func() {
4749
if taskInfo == nil {
4850
return
4951
}
50-
if _, err := t.queueHandler(taskInfo); err != nil {
52+
53+
// TODO: CREATE a new task from the original task, using taskbl
54+
// copy a new taskInfo
55+
task, err := structure.TaskManager.CreateTask(taskInfo.SpaceName, taskInfo.Type, 0)
56+
task.CreateType = structure.TaskCreateAsync
57+
task.GraphName = taskInfo.GraphName
58+
task.CreateUser = taskInfo.CreateUser
59+
task.Params = taskInfo.Params
60+
task.CronExpr = "" // clear cron expression for the new task
61+
task.Priority = taskInfo.Priority
62+
task.Preorders = taskInfo.Preorders
63+
task.Exclusive = taskInfo.Exclusive
64+
if err != nil {
65+
logrus.Errorf("Failed to create task from cron job for task %d: %v", taskInfo.ID, err)
66+
return
67+
}
68+
structure.TaskManager.AddTask(task)
69+
structure.TaskManager.SaveTask(task.ID)
70+
if _, err := t.queueHandler(task); err != nil {
5171
logrus.Errorf("Failed to queue task %d in cron job: %v", taskInfo.ID, err)
5272
return
5373
}
74+
logrus.Infof("Successfully queued task %d from cron job", task.ID)
5475
})
5576
if err != nil {
5677
logrus.Errorf("Failed to add cron job for task %d: %v", taskInfo.ID, err)
5778
return err
5879
}
5980
t.crons[taskInfo.ID] = append(t.crons[taskInfo.ID], cronJob)
81+
cronJob.Start()
82+
logrus.Infof("Added cron task for task ID %d with expression %s", taskInfo.ID, taskInfo.CronExpr)
6083
return nil
6184
}
6285

@@ -73,3 +96,28 @@ func (t *SchedulerCronManager) DeleteTask(taskID int32) error {
7396
logrus.Infof("Deleted cron task for task ID %d", taskID)
7497
return nil
7598
}
99+
100+
func (t *SchedulerCronManager) DeleteTaskByGraph(spaceName, graphName string) error {
101+
if spaceName == "" || graphName == "" {
102+
return errors.New("the argument `spaceName` or `graphName` is empty")
103+
}
104+
105+
var toDelete []int32
106+
for taskID, tasks := range t.cronTasks {
107+
for _, task := range tasks {
108+
if task.SpaceName == spaceName && task.GraphName == graphName {
109+
toDelete = append(toDelete, taskID)
110+
break
111+
}
112+
}
113+
}
114+
115+
for _, taskID := range toDelete {
116+
if err := t.DeleteTask(taskID); err != nil {
117+
logrus.Errorf("Failed to delete cron task for task ID %d: %v", taskID, err)
118+
return err
119+
}
120+
}
121+
logrus.Infof("Deleted cron tasks for space %s and graph %s", spaceName, graphName)
122+
return nil
123+
}

vermeer/test/scheduler/priority.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func SubTestConcurrent(t *testing.T, expectRes *functional.ExpectRes, healthChec
8686

8787
// send two tasks with different size
8888
params := make([]map[string]string, 0)
89+
// default is false, actually do not need to set
8990
taskComputeBody["exclusive"] = "false"
9091
params = append(params, taskComputeBody)
9192
params = append(params, taskComputeBody)

vermeer/test/scheduler/routine.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@ func SubTestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck *
1919
computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck)
2020
taskComputeBody := computeTest.TaskComputeBody()
2121

22-
taskComputeBody["cron_expr"] = "*/1 * * * * *" // every second
22+
// every 1 minute
23+
taskComputeBody["cron_expr"] = "* * * * *"
2324

2425
logrus.Infof("params for routine test: %+v", taskComputeBody)
2526

26-
computeTest.SendComputeReqAsync(taskComputeBody)
27-
computeTest.CheckRes()
27+
taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody)
28+
// computeTest.CheckRes()
2829

2930
// wait for a while and check again
30-
time.Sleep(10 * time.Second)
31+
time.Sleep(2 * time.Minute)
32+
33+
masterHttp.GetTaskCancel(int(taskid))
34+
3135
fmt.Printf("Test Routine: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime))
3236
}
3337

3438
func TestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) {
39+
var computeTask = "pagerank"
40+
3541
// TEST GROUP: ROUTINE
3642
// 1. send tasks to single graph
3743
// expect: the tasks should be executed timely
3844

39-
SubTestRoutine(t, expectRes, healthCheck, masterHttp, graphName, factor, waitSecond)
45+
SubTestRoutine(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
4046
}

0 commit comments

Comments
 (0)