-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathscheduler.go
More file actions
212 lines (178 loc) · 4.58 KB
/
scheduler.go
File metadata and controls
212 lines (178 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package functional
import (
"context"
"fmt"
"log/slog"
"sync"
"github.com/deckhouse/deckhouse/pkg/log"
"github.com/flant/addon-operator/pkg/app"
"github.com/flant/addon-operator/pkg/task"
sh_task "github.com/flant/shell-operator/pkg/task"
)
const (
// size for done and process channels
channelsBuffer = 32
// Root triggers modules without dependencies
Root = ""
)
// Scheduler is used to process functional modules,
// it waits until modules` dependencies are processed and then
// runs ModuleRun tasks for them in parallel queues
type Scheduler struct {
queueService queueService
logger *log.Logger
mtx sync.Mutex
requests map[string]*Request
done map[string]struct{}
scheduled map[string]struct{}
doneCh chan string
processCh chan *Request
}
type queueService interface {
AddLastTaskToQueue(queueName string, task sh_task.Task) error
}
// Request describes a module and run task options
type Request struct {
Name string
Description string
Dependencies []string
IsReloadAll bool
DoStartup bool
Labels map[string]string
}
// NewScheduler creates a scheduler instance and starts it
func NewScheduler(ctx context.Context, qService queueService, logger *log.Logger) *Scheduler {
s := &Scheduler{
queueService: qService,
logger: logger,
scheduled: make(map[string]struct{}),
requests: make(map[string]*Request),
done: make(map[string]struct{}),
doneCh: make(chan string, channelsBuffer),
processCh: make(chan *Request, channelsBuffer),
}
go func() {
s.runScheduleLoop(ctx)
}()
go func() {
s.runProcessLoop(ctx)
}()
return s
}
// runScheduleLoop launches the scheduling loop for a batch.
func (s *Scheduler) runScheduleLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case name := <-s.doneCh:
s.reschedule(name)
}
}
}
// runProcessLoop waits for requests to be processed
func (s *Scheduler) runProcessLoop(ctx context.Context) {
var idx int
for {
select {
case <-ctx.Done():
return
case req := <-s.processCh:
s.handleRequest(idx, req)
idx++
}
}
}
// Add adds requests to process
func (s *Scheduler) Add(reqs ...*Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, req := range reqs {
s.logger.Debug("add request", slog.Any("request", req))
// update module
s.requests[req.Name] = req
// undone module
delete(s.done, req.Name)
// unschedule module
delete(s.scheduled, req.Name)
}
}
// Remove removes module from done
// TODO(ipaqsa): stop module run task
func (s *Scheduler) Remove(name string) {
s.mtx.Lock()
defer s.mtx.Unlock()
// undone module
delete(s.done, name)
// unschedule module
delete(s.scheduled, name)
// remove module
delete(s.requests, name)
}
// reschedule marks module done and schedule new modules to be processed
func (s *Scheduler) reschedule(done string) {
s.mtx.Lock()
defer s.mtx.Unlock()
if done != Root {
// skip unscheduled
if _, ok := s.scheduled[done]; !ok {
return
}
// mark done
s.done[done] = struct{}{}
// delete from scheduled
delete(s.scheduled, done)
}
for _, req := range s.requests {
// skip processed
if _, ok := s.done[req.Name]; ok {
continue
}
// skip scheduled
if _, ok := s.scheduled[req.Name]; ok {
continue
}
// check if all dependencies done
ready := true
for _, dep := range req.Dependencies {
if _, ok := s.done[dep]; !ok {
ready = false
break
}
}
// schedule module if ready
if ready {
s.logger.Debug("trigger scheduling", slog.String("scheduled", req.Name), slog.Any("done", done))
s.scheduled[req.Name] = struct{}{}
s.processCh <- req
}
}
}
// handleRequest creates a ModuleRun task for request in a parallel queue
func (s *Scheduler) handleRequest(idx int, req *Request) {
queueName := fmt.Sprintf(app.ParallelQueueNamePattern, idx%(app.NumberOfParallelQueues-1))
moduleTask := sh_task.NewTask(task.ModuleRun).
WithLogLabels(req.Labels).
WithQueueName(queueName).
WithMetadata(task.HookMetadata{
EventDescription: req.Description,
ModuleName: req.Name,
DoModuleStartup: req.DoStartup,
IsReloadAll: req.IsReloadAll,
})
if err := s.queueService.AddLastTaskToQueue(queueName, moduleTask); err != nil {
s.logger.Error("add last task to queue", slog.String("queue", queueName), slog.Any("error", err))
}
}
// Done sends signal that module processing done
func (s *Scheduler) Done(name string) {
if s.doneCh != nil {
s.doneCh <- name
}
}
// Finished defines if processing done
func (s *Scheduler) Finished() bool {
s.mtx.Lock()
defer s.mtx.Unlock()
return len(s.done) == len(s.requests)
}