Skip to content

Commit 9145cb2

Browse files
committed
[feature] add system area
Signed-off-by: Stepan Paksashvili <stepan.paksashvili@flant.com>
1 parent 5612c62 commit 9145cb2

File tree

11 files changed

+285
-232
lines changed

11 files changed

+285
-232
lines changed

pkg/app/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var (
5353
CRDsFilters = "doc-,_"
5454

5555
// NumberOfParallelQueues defines the number of precreated parallel queues for parallel execution
56-
NumberOfParallelQueues = 10
56+
NumberOfParallelQueues = 15
5757
ParallelQueuePrefix = "parallel_queue"
5858
ParallelQueueNamePattern = ParallelQueuePrefix + "_%d"
5959
)

pkg/module_manager/loader/fs/fs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ func (fl *FileSystemLoader) getBasicModule(definition moduleDefinition, commonSt
7171
return nil, fmt.Errorf("new basic module: %w", err)
7272
}
7373

74+
m.SetCritical(true)
75+
7476
return m, nil
7577
}
7678

pkg/module_manager/models/modules/basic.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type BasicModule struct {
5454
// required
5555
Path string
5656

57+
critical bool
58+
5759
crdsExist bool
5860
crdFilesPaths []string
5961

@@ -114,6 +116,10 @@ func (bm *BasicModule) WithLogger(logger *log.Logger) {
114116
bm.logger = logger
115117
}
116118

119+
func (bm *BasicModule) SetCritical(value bool) {
120+
bm.critical = value
121+
}
122+
117123
// getCRDsFromPath scan path/crds directory and store yaml file in slice
118124
// if file name do not start with `_` or `doc-` prefix
119125
func getCRDsFromPath(path string, crdsFilters string) []string {
@@ -183,6 +189,10 @@ func (bm *BasicModule) GetHooks(bt ...sh_op_types.BindingType) []*hooks.ModuleHo
183189
return bm.hooks.getHooks(bt...)
184190
}
185191

192+
func (bm *BasicModule) GetCritical() bool {
193+
return bm.critical
194+
}
195+
186196
// HasReadiness returns whether the module has a readiness probe configured.
187197
func (bm *BasicModule) HasReadiness() bool {
188198
return bm.hasReadiness

pkg/module_manager/module_manager.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/flant/addon-operator/pkg/module_manager/models/moduleset"
3131
"github.com/flant/addon-operator/pkg/module_manager/scheduler"
3232
"github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders"
33+
bootstrapped_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/bootstrapped"
3334
dynamic_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/dynamically_enabled"
3435
kube_config_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/kube_config"
3536
script_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/script_enabled"
@@ -56,6 +57,8 @@ const (
5657
moduleMaintenanceMetricName = "{PREFIX}mm_module_maintenance"
5758

5859
moduleManagerServiceName = "module-manager"
60+
61+
bootstrappedValueSection = "clusterIsBootstrapped"
5962
)
6063

6164
// ModulesState determines which modules should be enabled, disabled or reloaded.
@@ -377,12 +380,30 @@ func (mm *ModuleManager) Init(logger *log.Logger) error {
377380
return fmt.Errorf("couldn't add scrpt_enabled extender: %w", err)
378381
}
379382

380-
// by this point we must have all required scheduler extenders attached
383+
bootstrappedExtender := bootstrapped_extender.NewExtender(func() (bool, error) {
384+
value, ok := mm.global.GetValues(false)[bootstrappedValueSection]
385+
if !ok {
386+
return false, nil
387+
}
388+
389+
bootstrapped, ok := value.(bool)
390+
if !ok {
391+
return false, errors.New("bootstrapped value not boolean")
392+
}
393+
394+
return bootstrapped, nil
395+
})
396+
397+
if err = mm.moduleScheduler.AddExtender(bootstrappedExtender); err != nil {
398+
return fmt.Errorf("couldn't add bootstrapped extender: %w", err)
399+
}
400+
401+
// by this point, we must have all required scheduler extenders attached
381402
if err := mm.moduleScheduler.ApplyExtenders(app.AppliedExtenders); err != nil {
382403
return fmt.Errorf("couldn't apply extenders to the module scheduler: %w", err)
383404
}
384405

385-
return mm.registerModules(scriptEnabledExtender)
406+
return mm.registerModules(scriptEnabledExtender, bootstrappedExtender)
386407
}
387408

388409
func (mm *ModuleManager) GetKubeConfigValid() bool {
@@ -1392,7 +1413,7 @@ func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool
13921413
} */
13931414

13941415
// registerModules load all available modules from modules directory.
1395-
func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.Extender) error {
1416+
func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.Extender, bootstrappedExtender *bootstrapped_extender.Extender) error {
13961417
if mm.ModulesDir == "" {
13971418
mm.logger.Warn("empty modules directory is passed, no modules to load")
13981419

@@ -1434,10 +1455,15 @@ func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.
14341455
mod.WithDependencies(dep)
14351456
set.Add(mod)
14361457

1437-
if err := mm.moduleScheduler.AddModuleVertex(mod); err != nil {
1458+
if err = mm.moduleScheduler.AddModuleVertex(mod); err != nil {
14381459
return fmt.Errorf("add module vertex: %w", err)
14391460
}
14401461

1462+
// functional modules require bootstrapped cluster
1463+
if !mod.GetCritical() {
1464+
bootstrappedExtender.AddFunctionalModule(mod.GetName())
1465+
}
1466+
14411467
scriptEnabledExtender.AddBasicModule(mod)
14421468

14431469
mm.SendModuleEvent(events.ModuleEvent{
@@ -1446,8 +1472,11 @@ func (mm *ModuleManager) registerModules(scriptEnabledExtender *script_extender.
14461472
})
14471473
}
14481474

1449-
mm.logger.Debug("Found modules",
1450-
slog.Any("modules", set.NamesInOrder()))
1475+
if err = mm.moduleScheduler.Initialize(); err != nil {
1476+
return fmt.Errorf("initialize scheduler: %w", err)
1477+
}
1478+
1479+
mm.logger.Debug("Found modules", slog.Any("modules", set.NamesInOrder()))
14511480

14521481
mm.l.Lock()
14531482
mm.modules = set
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package bootstrapped
2+
3+
import (
4+
"k8s.io/utils/ptr"
5+
6+
"github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders"
7+
exterr "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/error"
8+
)
9+
10+
const (
11+
Name extenders.ExtenderName = "Bootstrapped"
12+
)
13+
14+
type Extender struct {
15+
// check if the cluster bootstrapped
16+
isBootstrapped func() (bool, error)
17+
// functional modules require bootstrapped cluster
18+
modules map[string]struct{}
19+
}
20+
21+
func NewExtender(helper func() (bool, error)) *Extender {
22+
return &Extender{
23+
modules: make(map[string]struct{}),
24+
isBootstrapped: helper,
25+
}
26+
}
27+
28+
func (e *Extender) Name() extenders.ExtenderName {
29+
return Name
30+
}
31+
32+
func (e *Extender) Filter(moduleName string, _ map[string]string) (*bool, error) {
33+
if _, ok := e.modules[moduleName]; ok {
34+
bootstrapped, err := e.isBootstrapped()
35+
if err != nil {
36+
return nil, exterr.Permanent(err)
37+
}
38+
39+
// enable functional modules only if the cluster bootstrapped
40+
return ptr.To(bootstrapped), nil
41+
}
42+
43+
return nil, nil
44+
}
45+
46+
func (e *Extender) IsTerminator() bool {
47+
return true
48+
}
49+
50+
func (e *Extender) AddFunctionalModule(moduleName string) {
51+
e.modules[moduleName] = struct{}{}
52+
}

pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -127,28 +127,8 @@ func (f *TopologicalOne) GetTopologicalHints(moduleName string) []string {
127127
return []string{"foo", "bar"}
128128
case "operator-trivy":
129129
return []string{"istio", "admission-policy-engine"}
130-
}
131-
132-
return nil
133-
}
134-
135-
type TopologicalTwo struct{}
136-
137-
func (f *TopologicalTwo) Name() extenders.ExtenderName {
138-
return extenders.ExtenderName("TopologicalTwo")
139-
}
140-
141-
func (f *TopologicalTwo) Filter(_ string, _ map[string]string) (*bool, error) {
142-
return nil, nil
143-
}
144-
145-
func (f *TopologicalTwo) IsTerminator() bool {
146-
return true
147-
}
148-
149-
func (f *TopologicalTwo) GetTopologicalHints(moduleName string) []string {
150-
if moduleName == "my-module" {
151-
return []string{"unknown-module"}
130+
case "cilium-hubble":
131+
return []string{"cni-cilium"}
152132
}
153133

154134
return nil

pkg/module_manager/scheduler/node/mock/node_mock.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
)
77

88
type MockModule struct {
9+
Critical bool
910
EnabledScriptResult bool
1011
EnabledScriptErr error
1112
EnabledModules *[]string
@@ -27,6 +28,10 @@ func (m MockModule) GetOrder() uint32 {
2728
return m.Order
2829
}
2930

31+
func (m MockModule) GetCritical() bool {
32+
return m.Critical
33+
}
34+
3035
func (m MockModule) RunEnabledScript(_ context.Context, _ string, _ []string, _ map[string]string) (bool, error) {
3136
if m.EnabledScriptErr != nil {
3237
return false, m.EnabledScriptErr

pkg/module_manager/scheduler/node/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type ModuleInterface interface {
1010
GetName() string
1111
GetOrder() uint32
1212
GetPath() string
13+
GetCritical() bool
1314
}
1415

1516
type NodeType string

0 commit comments

Comments
 (0)