Skip to content

Commit c00bd80

Browse files
committed
chore: add some corner test
1 parent 767cad8 commit c00bd80

File tree

5 files changed

+138
-2
lines changed

5 files changed

+138
-2
lines changed

vermeer/apps/master/bl/task_bl.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,30 @@ func (tb *TaskBl) CreateTaskInfo(
6767
// for scheduler
6868
taskInfo.Priority = 0
6969
taskInfo.Preorders = make([]int32, 0)
70-
taskInfo.Exclusive = false // default to false, can be set to true if needed
71-
// taskInfo.Exclusive = true
70+
taskInfo.Exclusive = true // default to true for now, can be set false by params
7271
if params != nil {
7372
if priority, ok := params["priority"]; ok {
7473
if p, err := strconv.ParseInt(priority, 10, 32); err == nil {
74+
if p < 0 {
75+
return nil, fmt.Errorf("priority should be non-negative")
76+
}
7577
taskInfo.Priority = int32(p)
7678
} else {
7779
logrus.Warnf("priority convert to int32 error:%v", err)
80+
return nil, err
7881
}
7982
}
8083
if preorders, ok := params["preorders"]; ok {
8184
preorderList := strings.Split(preorders, ",")
8285
for _, preorder := range preorderList {
8386
if pid, err := strconv.ParseInt(preorder, 10, 32); err == nil {
87+
if taskMgr.GetTaskByID(int32(pid)) == nil {
88+
return nil, fmt.Errorf("preorder task id %d not exists", pid)
89+
}
8490
taskInfo.Preorders = append(taskInfo.Preorders, int32(pid))
8591
} else {
8692
logrus.Warnf("preorder convert to int32 error:%v", err)
93+
return nil, err
8794
}
8895
}
8996
}
@@ -92,9 +99,14 @@ func (tb *TaskBl) CreateTaskInfo(
9299
taskInfo.Exclusive = ex
93100
} else {
94101
logrus.Warnf("exclusive convert to bool error:%v", err)
102+
return nil, err
95103
}
96104
}
97105
if cronExpr, ok := params["cron_expr"]; ok {
106+
if err := Scheduler.cronManager.CheckCronExpression(cronExpr); err != nil {
107+
logrus.Warnf("cron_expr parse error:%v", err)
108+
return nil, err
109+
}
98110
taskInfo.CronExpr = cronExpr
99111
}
100112
}

vermeer/test/functional/compute_base.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWait(params map[string]string)
108108
return int32(resp.Task.ID)
109109
}
110110

111+
func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32, error) {
112+
//create Compute Task
113+
resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{
114+
TaskType: "compute",
115+
GraphName: ctb.graphName,
116+
Params: params,
117+
})
118+
if err != nil {
119+
return -1, err
120+
}
121+
return int32(resp.Task.ID), nil
122+
}
123+
111124
func (ctb *ComputeTaskBase) SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32, []int32) {
112125
//create Compute Task
113126
tasks := make([]client.TaskInfo, 0, len(params))

vermeer/test/functional/compute_task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type ComputeTask interface {
3737
TaskComputeBody() map[string]string
3838
SendComputeReqAsync(params map[string]string)
3939
SendComputeReqAsyncNotWait(params map[string]string) int32
40+
SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32, error)
4041
SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32, []int32)
4142
SendComputeReqSync(params map[string]string)
4243
LoadComputeRes() ([]interface{}, error)

vermeer/test/functional/http_interface.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ func (ct CancelTask) CancelTask(t *testing.T, master *client.VermeerClient, grap
7676
require.Equal(t, "canceled", task.Task.Status)
7777
}
7878

79+
func (ct CancelTask) DirectCancelTask(t *testing.T, master *client.VermeerClient, taskID int32) {
80+
ok, err := master.GetTaskCancel(int(taskID))
81+
require.NoError(t, err)
82+
require.Equal(t, true, ok)
83+
84+
task, err := master.GetTask(int(taskID))
85+
require.NoError(t, err)
86+
require.Equal(t, "canceled", task.Task.Status)
87+
}
88+
7989
type GetGraphs struct {
8090
}
8191

vermeer/test/scheduler/priority.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package scheduler
22

33
import (
44
"fmt"
5+
"sync"
56
"testing"
67
"time"
78

9+
"vermeer/apps/structure"
810
"vermeer/client"
911
"vermeer/test/functional"
1012

@@ -143,6 +145,100 @@ func SubTestDepends(t *testing.T, expectRes *functional.ExpectRes, healthCheck *
143145
fmt.Printf("Test Depends: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime))
144146
}
145147

148+
// SubTestInvalidDependency 测试当任务依赖一个不存在的任务ID时,调度器的行为。
149+
// 调度器应该拒绝此任务,并返回一个错误。
150+
func SubTestInvalidDependency(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) {
151+
fmt.Printf("Test Invalid Dependency start with task: %s\n", computeTask)
152+
bTime := time.Now()
153+
154+
computeTest, err := functional.MakeComputeTask(computeTask)
155+
require.NoError(t, err)
156+
computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck)
157+
158+
taskBody := computeTest.TaskComputeBody()
159+
// 设置 preorders 为一个非常大的、理论上不存在的任务ID
160+
invalidTaskID := 999999999
161+
taskBody["preorders"] = fmt.Sprintf("%d", invalidTaskID)
162+
163+
logrus.Infof("Attempting to submit a task with invalid dependency on ID: %d", invalidTaskID)
164+
165+
// 尝试异步提交任务,并检查是否返回了错误
166+
taskID, err := computeTest.SendComputeReqAsyncNotWaitWithError(taskBody)
167+
168+
// 断言提交操作失败
169+
require.Error(t, err, "Submitting a task with a non-existent dependency should return an error.")
170+
// 断言返回的任务ID为0,或者其他表示失败的值
171+
require.Equal(t, int32(-1), taskID, "The task ID should be zero or invalid on failure.")
172+
173+
fmt.Printf("Test Invalid Dependency: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime))
174+
}
175+
176+
func SubTestConcurrentCancellation(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, computeTask string, waitSecond int) {
177+
fmt.Printf("Test Concurrent Cancellation start with task: %s\n", computeTask)
178+
bTime := time.Now()
179+
180+
computeTest, err := functional.MakeComputeTask(computeTask)
181+
require.NoError(t, err)
182+
computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, masterHttp, t, healthCheck)
183+
184+
// 设置任务数量
185+
const numTasks = 20
186+
taskBodies := make([]map[string]string, numTasks)
187+
for i := 0; i < numTasks; i++ {
188+
taskBodies[i] = computeTest.TaskComputeBody()
189+
}
190+
191+
taskIDs := make(chan int32, numTasks)
192+
var wg sync.WaitGroup
193+
194+
// 1. 并发提交任务
195+
for i := 0; i < numTasks; i++ {
196+
wg.Add(1)
197+
go func(body map[string]string) {
198+
defer wg.Done()
199+
taskID := computeTest.SendComputeReqAsyncNotWait(body)
200+
if taskID != 0 {
201+
taskIDs <- taskID
202+
} else {
203+
logrus.Errorf("Failed to submit task: %v", err)
204+
}
205+
}(taskBodies[i])
206+
}
207+
208+
wg.Wait()
209+
close(taskIDs)
210+
211+
submittedTaskIDs := make([]int32, 0, numTasks)
212+
for id := range taskIDs {
213+
submittedTaskIDs = append(submittedTaskIDs, id)
214+
}
215+
216+
logrus.Infof("Submitted %d tasks concurrently: %+v", len(submittedTaskIDs), submittedTaskIDs)
217+
require.Equal(t, numTasks, len(submittedTaskIDs), "Not all tasks were successfully submitted.")
218+
219+
cancelTask := functional.CancelTask{}
220+
cancelTask.DirectCancelTask(t, masterHttp, submittedTaskIDs[len(submittedTaskIDs)-1])
221+
222+
// 3. 验证任务状态
223+
// 这里需要一个循环来检查所有任务的最终状态
224+
// 实际实现中,您可能需要根据调度器的API来轮询任务状态
225+
// 在这个示例中,我们只做基本的断言,因为没有实际的取消和状态查询逻辑
226+
logrus.Info("Waiting for tasks to settle...")
227+
time.Sleep(time.Duration(waitSecond) * time.Second)
228+
229+
checkTask, err := masterHttp.GetTask(int(submittedTaskIDs[numTasks-1]))
230+
231+
require.NoError(t, err, "Error fetching task status after cancellation.")
232+
require.NotNil(t, checkTask, "Task should exist after cancellation.")
233+
234+
if structure.TaskState(checkTask.Task.Status) != structure.TaskStateCanceled {
235+
logrus.Warn("No tasks were cancelled; check scheduler behavior.")
236+
require.Fail(t, "Expected at least some tasks to be cancelled.")
237+
}
238+
239+
fmt.Printf("Test Concurrent Cancellation: %-30s [OK], cost: %v\n", computeTask, time.Since(bTime))
240+
}
241+
146242
func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, factor string, waitSecond int) {
147243
fmt.Print("start test priority\n")
148244

@@ -176,4 +272,8 @@ func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck *fu
176272
// 6. send tasks to different graphs
177273
// expect: the tasks should be executed concurrently
178274
// have been tested in SubTestSmall and SubTestDepends
275+
276+
SubTestInvalidDependency(t, expectRes, healthCheck, masterHttp, graphName, computeTask, waitSecond)
277+
278+
SubTestConcurrentCancellation(t, expectRes, healthCheck, masterHttp, graphName, computeTask, 3)
179279
}

0 commit comments

Comments
 (0)