Skip to content

Commit dad0b39

Browse files
committed
chore: add some comments
1 parent c00bd80 commit dad0b39

File tree

11 files changed

+374
-16
lines changed

11 files changed

+374
-16
lines changed

vermeer/apps/master/bl/scheduler_bl.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import (
2828
"github.com/sirupsen/logrus"
2929
)
3030

31+
/*
32+
* @Description: ScheduleBl is the scheduler business logic.
33+
* @Note: This is the main scheduler business logic.
34+
*/
3135
type ScheduleBl struct {
3236
structure.MutexLocker
3337
// resource management
@@ -46,6 +50,10 @@ type ScheduleBl struct {
4650
softSchedule bool
4751
}
4852

53+
/*
54+
* @Description: Init initializes the ScheduleBl.
55+
* @Note: This function will initialize the ScheduleBl.
56+
*/
4957
func (s *ScheduleBl) Init() {
5058
logrus.Info("Initializing ScheduleBl...")
5159
s.LoadConfig()
@@ -64,6 +72,10 @@ func (s *ScheduleBl) Init() {
6472
go s.waitingStartedTask()
6573
}
6674

75+
/*
76+
* @Description: LoadConfig loads the configuration from the common package.
77+
* @Note: This function will load the configuration from the common package.
78+
*/
6779
func (s *ScheduleBl) LoadConfig() {
6880
// Load configuration from common package
6981

@@ -102,6 +114,10 @@ func (s *ScheduleBl) LoadConfig() {
102114
s.startChanSize, s.tickerInterval, s.softSchedule)
103115
}
104116

117+
/*
118+
* @Description: startTicker starts the ticker.
119+
* @Note: This function will start the ticker.
120+
*/
105121
func (s *ScheduleBl) startTicker() {
106122
// Create a ticker with the specified interval
107123
ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
@@ -113,6 +129,11 @@ func (s *ScheduleBl) startTicker() {
113129
}
114130

115131
// this make scheduler manager try to schedule next tasks
132+
/*
133+
* @Description: TryScheduleNextTasks tries to schedule the next tasks.
134+
* @Note: This function will try to schedule the next tasks.
135+
* @Param noLock
136+
*/
116137
func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
117138
defer func() {
118139
if err := recover(); err != nil {
@@ -126,6 +147,12 @@ func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
126147
}
127148

128149
// Main routine to schedule tasks
150+
/*
151+
* @Description: tryScheduleInner tries to schedule the next tasks.
152+
* @Note: This function will try to schedule the next tasks.
153+
* @Param softSchedule
154+
* @Param noLock
155+
*/
129156
func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error {
130157
// Implement logic to get the next task in the queue for the given space
131158
if !(len(noLock) > 0 && noLock[0]) {
@@ -183,6 +210,12 @@ func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error {
183210

184211
// QueueTask Add the task to the inner queue.
185212
// If the task exists, return false.
213+
/*
214+
* @Description: QueueTask queues the task.
215+
* @Note: This function will queue the task.
216+
* @Param taskInfo
217+
* @Return bool, error
218+
*/
186219
func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
187220
if taskInfo == nil {
188221
return false, errors.New("the argument `taskInfo` is nil")
@@ -231,6 +264,12 @@ func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
231264
return ok, nil
232265
}
233266

267+
/*
268+
* @Description: QueueTaskFromTemplate queues the task from the template.
269+
* @Note: This function will queue the task from the template. This function is used by cron tasks.
270+
* @Param template
271+
* @Return int32, error
272+
*/
234273
func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) (int32, error) {
235274
if template == nil {
236275
return -1, errors.New("the argument `template` is nil")
@@ -255,6 +294,12 @@ func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) (int32,
255294
return taskInfo.ID, nil
256295
}
257296

297+
/*
298+
* @Description: BatchQueueTask batches the task.
299+
* @Note: This function will batch the task.
300+
* @Param taskInfos
301+
* @Return []bool, []error
302+
*/
258303
func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []error) {
259304
if len(taskInfos) == 0 {
260305
return []bool{}, []error{}
@@ -281,7 +326,6 @@ func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, []
281326
}
282327

283328
// ******** CloseCurrent ********
284-
285329
func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) error {
286330
defer s.Unlock(s.Lock())
287331

@@ -307,6 +351,8 @@ func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) erro
307351
return nil
308352
}
309353

354+
// This will be called when a worker is offline.
355+
// This will be called when a worker is online.
310356
func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status schedules.WorkerOngoingStatus) (bool, error) {
311357
defer s.Unlock(s.Lock())
312358
s.resourceManager.ChangeWorkerStatus(workerName, status)
@@ -406,7 +452,7 @@ func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo *structur
406452

407453
// ********* CANCEL TASK ********
408454
// handle cancel task
409-
455+
// need to cancel cron task
410456
func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
411457
if taskInfo == nil {
412458
return errors.New("the argument `taskInfo` is nil")

vermeer/apps/master/schedules/scheduler_algorithm_manager.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111
"github.com/sirupsen/logrus"
1212
)
1313

14+
/*
15+
* @Description: SchedulerAlgorithm is the interface for the scheduler algorithm.
16+
* @Note: This is the interface for the scheduler algorithm.
17+
*/
1418
type SchedulerAlgorithm interface {
1519
// Name returns the name of the SchedulerAlgorithm
1620
Name() string
@@ -22,12 +26,21 @@ type SchedulerAlgorithm interface {
2226
ScheduleNextTasks(filteredTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error)
2327
}
2428

29+
/*
30+
* @Description: SchedulerAlgorithmManager is the manager for the scheduler algorithm.
31+
* @Note: This is the manager for the scheduler algorithm.
32+
*/
2533
type SchedulerAlgorithmManager struct {
2634
filteredSchedulerAlgorithms map[string]SchedulerAlgorithm
2735
scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm
2836
dispatchPaused bool
2937
}
3038

39+
/*
40+
* @Description: Init initializes the SchedulerAlgorithmManager.
41+
* @Note: This function will initialize the SchedulerAlgorithmManager.
42+
*/
43+
// Need to put DependsSchedulerAlgorithm before WaitingSchedulerAlgorithm
3144
func (am *SchedulerAlgorithmManager) Init() {
3245
am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
3346
am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
@@ -39,6 +52,11 @@ func (am *SchedulerAlgorithmManager) Init() {
3952
am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{})
4053
}
4154

55+
/*
56+
* @Description: RegisterSchedulerAlgorithm registers the scheduler algorithm.
57+
* @Note: This function will register the scheduler algorithm.
58+
* @Param schedulerAlgorithm
59+
*/
4260
func (am *SchedulerAlgorithmManager) RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) {
4361
if schedulerAlgorithm == nil {
4462
return
@@ -56,6 +74,11 @@ func (am *SchedulerAlgorithmManager) RegisterSchedulerAlgorithm(schedulerAlgorit
5674
am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm
5775
}
5876

77+
/*
78+
* @Description: RegisterFilterAlgorithm registers the filter algorithm.
79+
* @Note: This function will register the filter algorithm.
80+
* @Param filterAlgorithm
81+
*/
5982
func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm SchedulerAlgorithm) {
6083
if filterAlgorithm == nil {
6184
return
@@ -68,18 +91,43 @@ func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm Sch
6891
am.filteredSchedulerAlgorithms[name] = filterAlgorithm
6992
}
7093

94+
/*
95+
* @Description: IsDispatchPaused checks if the dispatch is paused.
96+
* @Note: This function will check if the dispatch is paused.
97+
* @Return bool
98+
*/
7199
func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool {
72100
return am.dispatchPaused
73101
}
74102

103+
/*
104+
* @Description: PauseDispatch pauses the dispatch.
105+
* @Note: This function will pause the dispatch.
106+
*/
75107
func (am *SchedulerAlgorithmManager) PauseDispatch() {
76108
am.dispatchPaused = true
77109
}
78110

111+
/*
112+
* @Description: ResumeDispatch resumes the dispatch.
113+
* @Note: This function will resume the dispatch.
114+
*/
79115
func (am *SchedulerAlgorithmManager) ResumeDispatch() {
80116
am.dispatchPaused = false
81117
}
82118

119+
/*
120+
* @Description: ScheduleNextTasks schedules the next tasks.
121+
* @Note: This function will schedule the next tasks.
122+
* @Param allTasks
123+
* @Param taskToWorkerGroupMap
124+
* @Param idleWorkerGroups
125+
* @Param concurrentWorkerGroups
126+
* @Param softSchedule
127+
* @Return []*structure.TaskInfo, error
128+
*/
129+
// For all tasks, filter and schedule them
130+
// Only one scheduling algorithm is supported for now
83131
func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, error) {
84132
if am.dispatchPaused {
85133
return nil, nil // No tasks to schedule if dispatch is paused

vermeer/apps/master/schedules/scheduler_cron_manager.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,36 @@ import (
88
"github.com/sirupsen/logrus"
99
)
1010

11+
/*
12+
* @Description: SchedulerCronManager is the manager for the scheduler cron.
13+
* @Note: This is the manager for the scheduler cron.
14+
*/
1115
type SchedulerCronManager struct {
1216
cronTasks map[int32][]*structure.TaskInfo // cron expression to TaskInfo. Origin task ID to copied tasks
1317
crons map[int32][]*cron.Cron // cron expression to cron jobs
1418
// queueTemplateHandler is a function that handles the task queue
1519
queueTemplateHandler func(*structure.TaskInfo) (int32, error)
1620
}
1721

22+
/*
23+
* @Description: Init initializes the SchedulerCronManager.
24+
* @Note: This function will initialize the SchedulerCronManager.
25+
* @Param queueTemplateHandler
26+
* @Return *SchedulerCronManager
27+
*/
1828
func (t *SchedulerCronManager) Init(queueTemplateHandler func(*structure.TaskInfo) (int32, error)) *SchedulerCronManager {
1929
t.cronTasks = make(map[int32][]*structure.TaskInfo)
2030
t.crons = make(map[int32][]*cron.Cron)
2131
t.queueTemplateHandler = queueTemplateHandler
2232
return t
2333
}
2434

35+
/*
36+
* @Description: CheckCronExpression checks the cron expression.
37+
* @Note: This function will check the cron expression.
38+
* @Param cronExpr
39+
* @Return error
40+
*/
2541
func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error {
2642
if cronExpr == "" {
2743
return errors.New("cron expression is empty")
@@ -33,6 +49,12 @@ func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error {
3349
return nil
3450
}
3551

52+
/*
53+
* @Description: AddCronTask adds the cron task.
54+
* @Note: This function will add the cron task.
55+
* @Param taskInfo
56+
* @Return error
57+
*/
3658
func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
3759
if taskInfo == nil {
3860
return errors.New("the argument `taskInfo` is nil")
@@ -69,6 +91,12 @@ func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error {
6991
return nil
7092
}
7193

94+
/*
95+
* @Description: DeleteTask deletes the cron task.
96+
* @Note: This function will delete the cron task.
97+
* @Param taskID
98+
* @Return error
99+
*/
72100
func (t *SchedulerCronManager) DeleteTask(taskID int32) error {
73101
if _, exists := t.cronTasks[taskID]; !exists {
74102
return errors.New("task not found in cron tasks")
@@ -83,6 +111,13 @@ func (t *SchedulerCronManager) DeleteTask(taskID int32) error {
83111
return nil
84112
}
85113

114+
/*
115+
* @Description: DeleteTaskByGraph deletes the cron task by graph.
116+
* @Note: This function will delete the cron task by graph.
117+
* @Param spaceName
118+
* @Param graphName
119+
* @Return error
120+
*/
86121
func (t *SchedulerCronManager) DeleteTaskByGraph(spaceName, graphName string) error {
87122
if spaceName == "" || graphName == "" {
88123
return errors.New("the argument `spaceName` or `graphName` is empty")

0 commit comments

Comments
 (0)