@@ -18,15 +18,13 @@ package consumergroup
1818
1919import (
2020 "context"
21- "encoding/json"
2221 "fmt"
2322 "strings"
2423 "time"
2524
2625 v1 "k8s.io/client-go/informers/core/v1"
2726
2827 "github.com/kelseyhightower/envconfig"
29- "go.uber.org/multierr"
3028 "go.uber.org/zap"
3129 appsv1 "k8s.io/api/apps/v1"
3230 corev1 "k8s.io/api/core/v1"
@@ -45,7 +43,6 @@ import (
4543 kubeclient "knative.dev/pkg/client/injection/kube/client"
4644 "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
4745 configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
48- nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
4946 podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
5047 secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
5148 "knative.dev/pkg/configmap"
@@ -93,11 +90,9 @@ type envConfig struct {
9390}
9491
9592type SchedulerConfig struct {
96- StatefulSetName string
97- RefreshPeriod time.Duration
98- Capacity int32
99- SchedulerPolicy * scheduler.SchedulerPolicy
100- DeSchedulerPolicy * scheduler.SchedulerPolicy
93+ StatefulSetName string
94+ RefreshPeriod time.Duration
95+ Capacity int32
10196}
10297
10398func NewController (ctx context.Context , watcher configmap.Watcher ) * controller.Impl {
@@ -109,10 +104,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
109104 }
110105
111106 c := SchedulerConfig {
112- RefreshPeriod : time .Duration (env .SchedulerRefreshPeriod ) * time .Second ,
113- Capacity : env .PodCapacity ,
114- SchedulerPolicy : schedulerPolicyFromConfigMapOrFail (ctx , env .SchedulerPolicyConfigMap ),
115- DeSchedulerPolicy : schedulerPolicyFromConfigMapOrFail (ctx , env .DeSchedulerPolicyConfigMap ),
107+ RefreshPeriod : time .Duration (env .SchedulerRefreshPeriod ) * time .Second ,
108+ Capacity : env .PodCapacity ,
116109 }
117110
118111 dispatcherPodInformer := podinformer .Get (ctx , internalsapi .DispatcherLabelSelectorStr )
@@ -332,11 +325,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string,
332325 return createStatefulSetScheduler (
333326 ctx ,
334327 SchedulerConfig {
335- StatefulSetName : ssName ,
336- RefreshPeriod : c .RefreshPeriod ,
337- Capacity : c .Capacity ,
338- SchedulerPolicy : c .SchedulerPolicy ,
339- DeSchedulerPolicy : c .DeSchedulerPolicy ,
328+ StatefulSetName : ssName ,
329+ RefreshPeriod : c .RefreshPeriod ,
330+ Capacity : c .Capacity ,
340331 },
341332 func () ([]scheduler.VPod , error ) {
342333 consumerGroups , err := lister .List (labels .SelectorFromSet (getSelectorLabel (ssName )))
@@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
380371 ScaleCacheConfig : scheduler.ScaleCacheConfig {RefreshPeriod : statefulSetScaleCacheRefreshPeriod },
381372 PodCapacity : c .Capacity ,
382373 RefreshPeriod : c .RefreshPeriod ,
383- SchedulerPolicy : scheduler .MAXFILLUP ,
384- SchedPolicy : c .SchedulerPolicy ,
385- DeschedPolicy : c .DeSchedulerPolicy ,
386374 Evictor : newEvictor (ctx , zap .String ("kafka.eventing.knative.dev/component" , "evictor" )).evict ,
387375 VPodLister : lister ,
388- NodeLister : nodeinformer .Get (ctx ).Lister (),
389376 PodLister : dispatcherPodInformer .Lister ().Pods (system .Namespace ()),
390377 })
391378
@@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
394381 SchedulerConfig : c ,
395382 }
396383}
397-
398- // schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap
399- func schedulerPolicyFromConfigMapOrFail (ctx context.Context , configMapName string ) * scheduler.SchedulerPolicy {
400- p , err := schedulerPolicyFromConfigMap (ctx , configMapName )
401- if err != nil {
402- logging .FromContext (ctx ).Fatal (zap .Error (err ))
403- }
404- return p
405- }
406-
407- // schedulerPolicyFromConfigMap reads predicates and priorities data from configMap
408- func schedulerPolicyFromConfigMap (ctx context.Context , configMapName string ) (* scheduler.SchedulerPolicy , error ) {
409- policyConfigMap , err := kubeclient .Get (ctx ).CoreV1 ().ConfigMaps (system .Namespace ()).Get (ctx , configMapName , metav1.GetOptions {})
410- if err != nil {
411- return nil , fmt .Errorf ("couldn't get scheduler policy config map %s/%s: %v" , system .Namespace (), configMapName , err )
412- }
413-
414- logger := logging .FromContext (ctx ).
415- Desugar ().
416- With (zap .String ("configmap" , configMapName ))
417- policy := & scheduler.SchedulerPolicy {}
418-
419- preds , found := policyConfigMap .Data ["predicates" ]
420- if ! found {
421- return nil , fmt .Errorf ("missing policy config map %s/%s value at key predicates" , system .Namespace (), configMapName )
422- }
423- if err := json .NewDecoder (strings .NewReader (preds )).Decode (& policy .Predicates ); err != nil {
424- return nil , fmt .Errorf ("invalid policy %v: %v" , preds , err )
425- }
426-
427- priors , found := policyConfigMap .Data ["priorities" ]
428- if ! found {
429- return nil , fmt .Errorf ("missing policy config map value at key priorities" )
430- }
431- if err := json .NewDecoder (strings .NewReader (priors )).Decode (& policy .Priorities ); err != nil {
432- return nil , fmt .Errorf ("invalid policy %v: %v" , preds , err )
433- }
434-
435- if errs := validatePolicy (policy ); errs != nil {
436- return nil , multierr .Combine (err )
437- }
438-
439- logger .Info ("Schedulers policy registration" , zap .Any ("policy" , policy ))
440-
441- return policy , nil
442- }
443-
444- func validatePolicy (policy * scheduler.SchedulerPolicy ) []error {
445- var validationErrors []error
446-
447- for _ , priority := range policy .Priorities {
448- if priority .Weight < scheduler .MinWeight || priority .Weight > scheduler .MaxWeight {
449- validationErrors = append (validationErrors , fmt .Errorf ("priority %s should have a positive weight applied to it or it has overflown" , priority .Name ))
450- }
451- }
452- return validationErrors
453- }
0 commit comments