Skip to content

Commit 767cad8

Browse files
committed
chore: change getting all task to complete/ refresh group assign
1 parent 02b83cc commit 767cad8

File tree

4 files changed

+64
-6
lines changed

4 files changed

+64
-6
lines changed

vermeer/apps/master/bl/compute_task.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ func (ctb *ComputeTaskBl) ComputeTaskStatus(
142142
}
143143
}
144144
taskMgr.ForceState(computeTask.Task, structure.TaskStateComplete)
145+
// for scheduler, mark task complete
146+
Scheduler.taskManager.MarkTaskComplete(taskId)
145147
graph.SubUsingNum()
146148
computeTask.FreeMemory()
147149
needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1

vermeer/apps/master/bl/load_task.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state string, workerName stri
204204
loadTask.Task.SetState(structure.TaskStateLoaded)
205205
//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoaded)
206206

207+
// for scheduler, mark task complete
208+
Scheduler.taskManager.MarkTaskComplete(taskId)
209+
207210
logrus.Infof("graph: %s, vertex: %d, edge: %d", graph.Name, graph.VertexCount, graph.EdgeCount)
208211
for _, w := range graph.Workers {
209212
logrus.Infof(

vermeer/apps/master/bl/scheduler_bl.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,23 @@ func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error {
134134

135135
// step 1: make sure all tasks have alloc to a worker group
136136
// This is done by the TaskManager, which assigns a worker group to each task
137+
s.taskManager.RefreshTaskToWorkerGroupMap()
137138

138139
// step 2: get available resources and tasks
139140
logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
140141
idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
141142
concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
142-
allTasks := s.taskManager.GetAllTasks()
143+
allTasks := s.taskManager.GetAllTasksNotComplete()
143144
if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 && len(concurrentWorkerGroups) == 0) {
144145
logrus.Debugf("no available tasks or workerGroups, allTasks: %d, workerGroups: %d/%d",
145146
len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups))
146147
return nil
147148
}
148149
logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups))
149150

151+
// TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
152+
// NOT only by user setting, but also by scheduler setting
153+
150154
// step 3: return the task with the highest priority or small tasks which can be executed immediately
151155
taskToWorkerGroupMap := s.taskManager.GetTaskToWorkerGroupMap()
152156
nextTasks, err := s.algorithmManager.ScheduleNextTasks(allTasks, taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
@@ -195,6 +199,19 @@ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
195199

196200
logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, taskInfo)
197201

202+
// check dependency if exists
203+
if len(taskInfo.Preorders) > 0 {
204+
for _, depTaskID := range taskInfo.Preorders {
205+
depTask := taskMgr.GetTaskByID(depTaskID)
206+
if depTask == nil {
207+
err := errors.New("the dependency task with ID " + strconv.Itoa(int(depTaskID)) + " does not exist")
208+
logrus.Error(err)
209+
taskMgr.SetError(taskInfo, err.Error())
210+
return false, err
211+
}
212+
}
213+
}
214+
198215
// Notice: Ensure successful invocation.
199216
// make sure all tasks have alloc to a worker group
200217
ok, err := s.taskManager.QueueTask(taskInfo)

vermeer/apps/master/schedules/scheduler_task_manager.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@ type SchedulerTaskManager struct {
1414
allTaskMap map[int32]*structure.TaskInfo
1515
allTaskQueue []*structure.TaskInfo
1616
startTaskQueue []*structure.TaskInfo
17+
// onGoingTasks
18+
notCompleteTasks map[int32]*structure.TaskInfo
1719
// A map from task ID to worker group can be used to track which worker group is handling which task.
1820
taskToworkerGroupMap map[int32]string
1921
}
2022

2123
func (t *SchedulerTaskManager) Init() *SchedulerTaskManager {
2224
t.allTaskMap = make(map[int32]*structure.TaskInfo)
25+
t.notCompleteTasks = make(map[int32]*structure.TaskInfo)
2326
t.taskToworkerGroupMap = make(map[int32]string)
2427
return t
2528
}
@@ -38,10 +41,23 @@ func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool, er
3841
// Add the task to the task map
3942
t.allTaskMap[taskInfo.ID] = taskInfo
4043
t.allTaskQueue = append(t.allTaskQueue, taskInfo)
44+
t.notCompleteTasks[taskInfo.ID] = taskInfo
4145
t.AssignGroup(taskInfo)
4246
return true, nil
4347
}
4448

49+
func (t *SchedulerTaskManager) RefreshTaskToWorkerGroupMap() {
50+
defer t.Unlock(t.Lock())
51+
52+
for _, taskInfo := range t.GetAllTasksNotComplete() {
53+
if taskInfo == nil {
54+
continue
55+
}
56+
t.AssignGroup(taskInfo)
57+
t.taskToworkerGroupMap[taskInfo.ID] = workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
58+
}
59+
}
60+
4561
// Only for debug or test, get task start sequence
4662
func (t *SchedulerTaskManager) AddTaskStartSequence(taskID int32) error {
4763
if _, exists := t.allTaskMap[taskID]; !exists {
@@ -64,6 +80,15 @@ func (t *SchedulerTaskManager) RemoveTask(taskID int32) error {
6480
}
6581
}
6682
delete(t.taskToworkerGroupMap, taskID)
83+
delete(t.notCompleteTasks, taskID)
84+
return nil
85+
}
86+
87+
func (t *SchedulerTaskManager) MarkTaskComplete(taskID int32) error {
88+
if _, exists := t.allTaskMap[taskID]; !exists {
89+
return errors.New("task not found")
90+
}
91+
delete(t.notCompleteTasks, taskID)
6792
return nil
6893
}
6994

@@ -106,9 +131,17 @@ func (t *SchedulerTaskManager) GetAllTasks() []*structure.TaskInfo {
106131
return tasks
107132
}
108133

134+
func (t *SchedulerTaskManager) GetAllTasksNotComplete() []*structure.TaskInfo {
135+
tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
136+
for _, task := range t.notCompleteTasks {
137+
tasks = append(tasks, task)
138+
}
139+
return tasks
140+
}
141+
109142
func (t *SchedulerTaskManager) GetAllTasksWaitng() []*structure.TaskInfo {
110143
tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
111-
for _, task := range t.allTaskMap {
144+
for _, task := range t.GetAllTasksNotComplete() {
112145
if task.State == structure.TaskStateWaiting {
113146
tasks = append(tasks, task)
114147
}
@@ -118,7 +151,7 @@ func (t *SchedulerTaskManager) GetAllTasksWaitng() []*structure.TaskInfo {
118151

119152
func (t *SchedulerTaskManager) GetTasksInQueue(space string) []*structure.TaskInfo {
120153
tasks := make([]*structure.TaskInfo, 0)
121-
for _, task := range t.allTaskQueue {
154+
for _, task := range t.GetAllTasksNotComplete() {
122155
if task.SpaceName == space {
123156
tasks = append(tasks, task)
124157
}
@@ -153,9 +186,12 @@ func (t *SchedulerTaskManager) GetTaskStartSequence(queryTasks []int32) []*struc
153186

154187
func (t *SchedulerTaskManager) GetTaskToWorkerGroupMap() map[int32]string {
155188
// Return a copy of the worker group map to avoid external modifications
156-
groupMap := make(map[int32]string, len(t.taskToworkerGroupMap))
157-
for k, v := range t.taskToworkerGroupMap {
158-
groupMap[k] = v
189+
taskNotComplete := t.GetAllTasksNotComplete()
190+
groupMap := make(map[int32]string, len(taskNotComplete))
191+
for _, task := range taskNotComplete {
192+
if group, exists := t.taskToworkerGroupMap[task.ID]; exists {
193+
groupMap[task.ID] = group
194+
}
159195
}
160196
return groupMap
161197
}

0 commit comments

Comments
 (0)