Skip to content

Commit 02b83cc

Browse files
committed
chore: change some files/ai suggestions
1 parent ec5c1d2 commit 02b83cc

File tree

10 files changed

+76
-41
lines changed

10 files changed

+76
-41
lines changed

vermeer/apps/master/bl/grpc_handlers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,15 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
100100
}
101101

102102
_, err = workerMgr.AddWorker(reqWorker)
103-
Scheduler.ChangeWorkerStatus(reqWorker.Name, schedules.WorkerOngoingStatusIdle)
104103
if err != nil {
105104
logrus.Errorf("failed to add a WorkerClient to the WorkerManager, error: %s", err)
106105
return &pb.HelloMasterResp{}, err
107106
}
107+
_, err = Scheduler.ChangeWorkerStatus(reqWorker.Name, schedules.WorkerOngoingStatusIdle)
108+
if err != nil {
109+
logrus.Errorf("failed to change worker status to idle, error: %s", err)
110+
return &pb.HelloMasterResp{}, err
111+
}
108112

109113
logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
110114

vermeer/apps/master/bl/scheduler_bl.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,22 +113,24 @@ func (s *ScheduleBl) startTicker() {
113113
}
114114

115115
// this make scheduler manager try to schedule next tasks
116-
func (s *ScheduleBl) TryScheduleNextTasks() {
116+
func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
117117
defer func() {
118118
if err := recover(); err != nil {
119119
logrus.Errorln("TryScheduleNextTasks() has been recovered:", err)
120120
}
121121
}()
122122

123-
if err := s.tryScheduleInner(s.softSchedule); err != nil {
123+
if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
124124
logrus.Errorf("do scheduling error:%v", err)
125125
}
126126
}
127127

128128
// Main routine to schedule tasks
129-
func (s *ScheduleBl) tryScheduleInner(softSchedule bool) error {
129+
func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error {
130130
// Implement logic to get the next task in the queue for the given space
131-
defer s.Unlock(s.Lock())
131+
if !(len(noLock) > 0 && noLock[0]) {
132+
defer s.Unlock(s.Lock())
133+
}
132134

133135
// step 1: make sure all tasks have alloc to a worker group
134136
// This is done by the TaskManager, which assigns a worker group to each task
@@ -264,6 +266,8 @@ func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []
264266
// ******** CloseCurrent ********
265267

266268
func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) error {
269+
defer s.Unlock(s.Lock())
270+
267271
// trace tasks need these workers, check if these tasks are available
268272
s.taskManager.RemoveTask(taskId)
269273
// release the worker group
@@ -282,24 +286,27 @@ func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) erro
282286
}
283287

284288
logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
285-
s.TryScheduleNextTasks()
289+
s.TryScheduleNextTasks(true)
286290
return nil
287291
}
288292

289-
func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status schedules.WorkerOngoingStatus) {
293+
func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status schedules.WorkerOngoingStatus) (bool, error) {
294+
defer s.Unlock(s.Lock())
290295
s.resourceManager.ChangeWorkerStatus(workerName, status)
291296

292297
logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
293298
// After changing the worker status, we may need to reschedule tasks
294-
s.TryScheduleNextTasks()
299+
s.TryScheduleNextTasks(true)
300+
301+
return true, nil
295302
}
296303

297304
// ******** START TASK ********
298305
func (s *ScheduleBl) waitingStartedTask() {
299306
for taskInfo := range s.startChan {
300307
if taskInfo == nil {
301308
logrus.Warnf("recieved a nil task from startChan")
302-
return
309+
continue
303310
}
304311

305312
logrus.Infof("chan received task '%d' to start", taskInfo.ID)
@@ -388,6 +395,8 @@ func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
388395
return errors.New("the argument `taskInfo` is nil")
389396
}
390397

398+
defer s.Unlock(s.Lock())
399+
391400
isHeadTask := s.taskManager.IsTaskOngoing(taskInfo.ID)
392401
task := s.taskManager.RemoveTask(taskInfo.ID)
393402
s.cronManager.DeleteTask(taskInfo.ID)

vermeer/apps/master/bl/task_bl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (tb *TaskBl) CreateTaskInfo(
7171
// taskInfo.Exclusive = true
7272
if params != nil {
7373
if priority, ok := params["priority"]; ok {
74-
if p, err := strconv.Atoi(priority); err == nil {
74+
if p, err := strconv.ParseInt(priority, 10, 32); err == nil {
7575
taskInfo.Priority = int32(p)
7676
} else {
7777
logrus.Warnf("priority convert to int32 error:%v", err)
@@ -80,7 +80,7 @@ func (tb *TaskBl) CreateTaskInfo(
8080
if preorders, ok := params["preorders"]; ok {
8181
preorderList := strings.Split(preorders, ",")
8282
for _, preorder := range preorderList {
83-
if pid, err := strconv.Atoi(preorder); err == nil {
83+
if pid, err := strconv.ParseInt(preorder, 10, 32); err == nil {
8484
taskInfo.Preorders = append(taskInfo.Preorders, int32(pid))
8585
} else {
8686
logrus.Warnf("preorder convert to int32 error:%v", err)

vermeer/apps/master/schedules/scheduler_algorithm_manager.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package schedules
22

33
import (
4+
"slices"
45
"sort"
56
"strconv"
67
"time"
@@ -23,13 +24,13 @@ type SchedulerAlgorithm interface {
2324

2425
type SchedulerAlgorithmManager struct {
2526
filteredSchedulerAlgorithms map[string]SchedulerAlgorithm
26-
schuduledSchedulerAlgorithms map[string]SchedulerAlgorithm
27+
scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm
2728
dispatchPaused bool
2829
}
2930

3031
func (am *SchedulerAlgorithmManager) Init() {
3132
am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
32-
am.schuduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
33+
am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
3334
am.dispatchPaused = false
3435
// Register filter and schedule algorithms
3536
am.RegisterFilterAlgorithm(&DependsSchedulerAlgorithm{})
@@ -38,33 +39,33 @@ func (am *SchedulerAlgorithmManager) Init() {
3839
am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{})
3940
}
4041

41-
func (am *SchedulerAlgorithmManager) RegisterSchedulerAlgorithm(SchedulerAlgorithm SchedulerAlgorithm) {
42-
if SchedulerAlgorithm == nil {
42+
func (am *SchedulerAlgorithmManager) RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) {
43+
if schedulerAlgorithm == nil {
4344
return
4445
}
45-
name := SchedulerAlgorithm.Name()
46-
if _, exists := am.schuduledSchedulerAlgorithms[name]; exists {
46+
name := schedulerAlgorithm.Name()
47+
if _, exists := am.scheduledSchedulerAlgorithms[name]; exists {
4748
return // SchedulerAlgorithm already registered
4849
}
4950

5051
// only support one scheduling algorithm for now
51-
if len(am.schuduledSchedulerAlgorithms) > 0 {
52+
if len(am.scheduledSchedulerAlgorithms) > 0 {
5253
return // Only one scheduling algorithm can be registered
5354
}
54-
SchedulerAlgorithm.Init()
55-
am.schuduledSchedulerAlgorithms[name] = SchedulerAlgorithm
55+
schedulerAlgorithm.Init()
56+
am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm
5657
}
5758

58-
func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(FilterAlgorithm SchedulerAlgorithm) {
59-
if FilterAlgorithm == nil {
59+
func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm SchedulerAlgorithm) {
60+
if filterAlgorithm == nil {
6061
return
6162
}
62-
name := FilterAlgorithm.Name()
63+
name := filterAlgorithm.Name()
6364
if _, exists := am.filteredSchedulerAlgorithms[name]; exists {
6465
return // SchedulerAlgorithm already registered
6566
}
66-
FilterAlgorithm.Init()
67-
am.filteredSchedulerAlgorithms[name] = FilterAlgorithm
67+
filterAlgorithm.Init()
68+
am.filteredSchedulerAlgorithms[name] = filterAlgorithm
6869
}
6970

7071
func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool {
@@ -98,7 +99,7 @@ func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks []*structure.Tas
9899

99100
// only support one scheduling algorithm for now
100101
// get first algorithm
101-
for _, algorithm := range am.schuduledSchedulerAlgorithms {
102+
for _, algorithm := range am.scheduledSchedulerAlgorithms {
102103
tasks, err := algorithm.ScheduleNextTasks(filteredTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
103104
if err != nil {
104105
return nil, err
@@ -268,7 +269,12 @@ func (p *PriorityElderSchedulerAlgorithm) CalculateTaskEmergency(task *structure
268269
priorityCost := priorityParam * int64(task.Priority)
269270
// step 3: resource cost
270271
graph := structure.GraphManager.GetGraphByName(task.SpaceName, task.GraphName)
271-
resourceCost := resourceParam / max(1, graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1
272+
resourceCost := int64(0)
273+
if graph == nil {
274+
resourceCost = resourceParam // if graph not found, use max resource cost
275+
} else {
276+
resourceCost = resourceParam / max(1, graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1
277+
}
272278
// step 4: some random value
273279
randomValue := int64(randomValueParam) // Placeholder for any random value logic
274280
if printValue {
@@ -282,13 +288,19 @@ func (p *PriorityElderSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structur
282288
return nil, nil // No tasks to schedule
283289
}
284290

291+
// calculate emergency value for each task
292+
taskEmergencies := make(map[int32]int64)
293+
for _, task := range allTasks {
294+
taskEmergencies[task.ID] = p.CalculateTaskEmergency(task, taskToWorkerGroupMap, false)
295+
}
296+
285297
// Sort tasks by priority (higher priority first)
286298
sort.Slice(allTasks, func(i, j int) bool {
287-
return p.CalculateTaskEmergency(allTasks[i], taskToWorkerGroupMap, false) > p.CalculateTaskEmergency(allTasks[j], taskToWorkerGroupMap, false)
299+
return taskEmergencies[allTasks[i].ID] > taskEmergencies[allTasks[j].ID]
288300
})
289301

290302
for _, task := range allTasks {
291-
logrus.Debugf("Task %d: Emergency Value: %d", task.ID, p.CalculateTaskEmergency(task, taskToWorkerGroupMap, true))
303+
logrus.Debugf("Task %d: Emergency Value: %d", task.ID, taskEmergencies[task.ID])
292304
}
293305

294306
for _, task := range allTasks {
@@ -427,19 +439,17 @@ func (d *DependsSchedulerAlgorithm) ScheduleNextTasks(allTasks []*structure.Task
427439
// Check if all dependencies are satisfied
428440
allDepsSatisfied := true
429441
for _, dep := range depends {
430-
if depTask, exists := allTaskIDs[dep]; !exists || depTask.State != structure.TaskStateWaiting {
442+
if depTask, exists := allTaskIDs[dep]; exists && depTask.State != structure.TaskStateComplete {
431443
allDepsSatisfied = false
432444
break
433445
}
434446
}
435447
if allDepsSatisfied {
436448
if group, exists := taskToWorkerGroupMap[task.ID]; exists && group != "" {
437449
// only support idle worker groups for now
438-
for _, idleGroup := range idleWorkerGroups {
439-
if group == idleGroup {
440-
logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group)
441-
return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled
442-
}
450+
if slices.Contains(idleWorkerGroups, group) {
451+
logrus.Debugf("Task %d is assigned to worker group %s", task.ID, group)
452+
return []*structure.TaskInfo{task}, nil // Return the first task that can be scheduled
443453
}
444454
}
445455
}

vermeer/apps/master/schedules/scheduler_cron_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
4343
}
4444

4545
// add to cron tasks
46-
t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo)
4746
cronJob := cron.New()
4847
_, err := cronJob.AddFunc(taskInfo.CronExpr, func() {
4948
if taskInfo == nil {
@@ -63,6 +62,7 @@ func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
6362
logrus.Errorf("Failed to add cron job for task %d: %v", taskInfo.ID, err)
6463
return err
6564
}
65+
t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo)
6666
t.crons[taskInfo.ID] = append(t.crons[taskInfo.ID], cronJob)
6767
cronJob.Start()
6868
logrus.Infof("Added cron task for task ID %d with expression %s", taskInfo.ID, taskInfo.CronExpr)

vermeer/apps/master/schedules/scheduler_task_manager.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
)
99

1010
type SchedulerTaskManager struct {
11+
structure.MutexLocker
1112
// This struct is responsible for managing tasks in the scheduling system.
1213
// A map from task ID to TaskInfo can be used to track tasks.
1314
allTaskMap map[int32]*structure.TaskInfo
@@ -32,6 +33,8 @@ func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool, er
3233
return false, errors.New("the property `SpaceName` of taskInfo is empty")
3334
}
3435

36+
defer t.Unlock(t.Lock())
37+
3538
// Add the task to the task map
3639
t.allTaskMap[taskInfo.ID] = taskInfo
3740
t.allTaskQueue = append(t.allTaskQueue, taskInfo)
@@ -53,6 +56,13 @@ func (t *SchedulerTaskManager) RemoveTask(taskID int32) error {
5356
return errors.New("task not found")
5457
}
5558
delete(t.allTaskMap, taskID)
59+
// remove from queue
60+
for i, task := range t.allTaskQueue {
61+
if task.ID == taskID {
62+
t.allTaskQueue = append(t.allTaskQueue[:i], t.allTaskQueue[i+1:]...)
63+
break
64+
}
65+
}
5666
delete(t.taskToworkerGroupMap, taskID)
5767
return nil
5868
}

vermeer/config/master.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
; limitations under the License.
1515

1616
[default]
17-
log_level=debug
17+
log_level=info
1818
debug_mode=release
1919
http_peer=0.0.0.0:6688
2020
grpc_peer=0.0.0.0:6689

vermeer/test/functional/compute_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (ctb *ComputeTaskBase) SendComputeReqAsyncBatchPriority(params []map[string
140140
//若成功启动Compute Task,开始轮询tasksGet,解析response,得到状态为完成时break。
141141
var taskResp *client.TaskResponse
142142
var err error
143-
for i := 0; i < ctb.waitSecond; i++ {
143+
for attempt := 0; attempt < ctb.waitSecond; attempt++ {
144144
ctb.healthCheck.DoHealthCheck()
145145
taskResp, err = ctb.masterHttp.GetTask(ctb.taskID)
146146
require.NoError(ctb.t, err)

vermeer/test/functional/load_local.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
4747
}
4848
}
4949

50+
// TaskLoadBodyWithNum creates load configuration with specified number of files.
51+
// If num <= 10, it will be automatically adjusted to 30 to ensure minimum test coverage.
5052
func (lt *LoadTaskLocal) TaskLoadBodyWithNum(num int) map[string]string {
5153
vertexBackends := []string{"db", "mem"}
5254

vermeer/vermeer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ func testScheduler(t *testing.T) {
111111

112112
func testAlgorithms(t *testing.T) {
113113
// todo: 增加算法名称
114-
// var computeTasks = []string{"pagerank", "lpa", "wcc", "degree_out", "degree_in", "degree_both", "triangle_count",
115-
// "sssp", "closeness_centrality", "betweenness_centrality", "kcore", "jaccard", "ppr", "clustering_coefficient", "scc", "louvain"}
116-
var computeTasks = []string{"pagerank"}
114+
var computeTasks = []string{"pagerank", "lpa", "wcc", "degree_out", "degree_in", "degree_both", "triangle_count",
115+
"sssp", "closeness_centrality", "betweenness_centrality", "kcore", "jaccard", "ppr", "clustering_coefficient", "scc", "louvain"}
116+
// var computeTasks = []string{"pagerank"}
117117

118118
startTime := time.Now()
119119
expectRes, err := functional.GetExpectRes(expectResPath)

0 commit comments

Comments
 (0)