Skip to content

Commit 44f1238

Browse files
committed
make scheduling logic independent from API types
1 parent 60becc4 commit 44f1238

File tree

5 files changed

+672
-203
lines changed

5 files changed

+672
-203
lines changed

api/clusters/v1alpha1/constants/reasons.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ const (
1313
ReasonInternalError = "InternalError"
1414
// ReasonPreemptiveRequest indicates that the ClusterRequest is preemptive and AccessRequests referencing it are denied.
1515
ReasonPreemptiveRequest = "PreemptiveRequest"
16+
// ReasonSchedulingFailed indicates that there was a problem with scheduling a request.
17+
ReasonSchedulingFailed = "SchedulingFailed"
1618
)

internal/config/config_scheduler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ const (
4242
type Strategy string
4343

4444
const (
45-
STRATEGY_BALANCED Strategy = "Balanced"
46-
STRATEGY_RANDOM Strategy = "Random"
47-
STRATEGY_SIMPLE Strategy = "Simple"
45+
STRATEGY_BALANCED_IGNORE_EMPTY Strategy = "BalancedIgnoreEmpty"
46+
STRATEGY_BALANCED Strategy = "Balanced"
47+
STRATEGY_RANDOM Strategy = "Random"
48+
STRATEGY_SIMPLE Strategy = "Simple"
4849
)
4950

5051
type ClusterDefinition struct {

internal/controllers/scheduler/controller.go

Lines changed: 48 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package scheduler
33
import (
44
"context"
55
"fmt"
6-
"math/rand/v2"
76
"slices"
87
"strings"
98

109
apierrors "k8s.io/apimachinery/pkg/api/errors"
1110
"k8s.io/apimachinery/pkg/util/sets"
11+
"k8s.io/utils/ptr"
1212
ctrl "sigs.k8s.io/controller-runtime"
1313
"sigs.k8s.io/controller-runtime/pkg/client"
1414
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -20,11 +20,13 @@ import (
2020
ctrlutils "github.com/openmcp-project/controller-utils/pkg/controller"
2121
errutils "github.com/openmcp-project/controller-utils/pkg/errors"
2222
"github.com/openmcp-project/controller-utils/pkg/logging"
23+
"github.com/openmcp-project/controller-utils/pkg/pairs"
2324

2425
clustersv1alpha1 "github.com/openmcp-project/openmcp-operator/api/clusters/v1alpha1"
2526
cconst "github.com/openmcp-project/openmcp-operator/api/clusters/v1alpha1/constants"
2627
apiconst "github.com/openmcp-project/openmcp-operator/api/constants"
2728
"github.com/openmcp-project/openmcp-operator/internal/config"
29+
"github.com/openmcp-project/openmcp-operator/internal/controllers/scheduler/strategy"
2830
)
2931

3032
const ControllerName = "Scheduler"
@@ -177,62 +179,30 @@ func (r *ClusterScheduler) handleCreateOrUpdate(ctx context.Context, req reconci
177179
return rr
178180
}
179181

180-
// if no cluster was found, check if there is an existing cluster that qualifies for the request
181-
// skip this check for preemptive requests with purposes with exclusive tenancy, as they will always result in a new cluster
182-
if !(cDef.IsExclusive() && cr.Spec.Preemptive) { //nolint:staticcheck // QF1001
183-
cluster, rerr = r.pickFittingCluster(ctx, cr, clusters, cDef)
184-
if rerr != nil {
185-
rr.ReconcileError = rerr
186-
return rr
187-
}
182+
sr, err := Schedule(ctx, clusters, cDef, strategy.FromConfig(r.Config.Strategy), NewSchedulingRequest(string(cr.UID), cr.Spec.Preemptive, false, cr.Namespace, cr.Spec.Purpose))
183+
if err != nil {
184+
rr.ReconcileError = errutils.WithReason(fmt.Errorf("error scheduling request '%s': %w", req.String(), err), cconst.ReasonSchedulingFailed)
185+
return rr
188186
}
189187

190-
if cluster != nil {
191-
log.Info("Existing cluster qualifies for request, using it", "clusterName", cluster.Name, "clusterNamespace", cluster.Namespace)
192-
193-
// patch finalizer into Cluster
194-
oldCluster := cluster.DeepCopy()
195-
fin := cr.FinalizerForCluster()
196-
changed := controllerutil.AddFinalizer(cluster, fin) // should always be true at this point
197-
replacedPreemptiveUID := ""
198-
if !cr.Spec.Preemptive && !cDef.IsSharedUnlimitedly() {
199-
// if the request is not preemptive, it can potentially replace a preemptive request that was already assigned to this cluster
200-
// this check can be skipped for clusters which are shared unlimitedly, as the preemptive request would point to the same cluster again
201-
suffix, removed := RemoveFinalizerWithPrefix(cluster, clustersv1alpha1.PreemptiveRequestFinalizerOnClusterPrefix)
202-
replacedPreemptiveUID = suffix
203-
if removed {
204-
log.Info("Replacing preemptive request", "uid", replacedPreemptiveUID)
205-
if rerr := r.ReschedulePreemptiveRequest(ctx, replacedPreemptiveUID); rerr != nil {
206-
if rerr.Reason() == cconst.ReasonInternalError {
207-
// if the problem is that the preemptive request was not found, only log an error
208-
log.Error(rerr, "Error rescheduling preemptive request")
209-
} else {
210-
rr.ReconcileError = rerr
211-
return rr
212-
}
213-
}
214-
}
215-
}
216-
if changed {
217-
log.Debug("Adding finalizer to cluster", "clusterName", cluster.Name, "clusterNamespace", cluster.Namespace, "finalizer", fin, "replacedPreemptiveUID", replacedPreemptiveUID)
218-
if err := r.PlatformCluster.Client().Patch(ctx, cluster, client.MergeFrom(oldCluster)); err != nil {
219-
rr.ReconcileError = errutils.WithReason(fmt.Errorf("error patching finalizer '%s' on cluster '%s/%s': %w", fin, cluster.Namespace, cluster.Name, err), cconst.ReasonPlatformClusterInteractionProblem)
220-
return rr
221-
}
222-
}
223-
} else {
224-
cluster = r.initializeNewCluster(ctx, cr, cDef)
225-
226-
// create Cluster resource
227-
if err := r.PlatformCluster.Client().Create(ctx, cluster); err != nil {
228-
if apierrors.IsAlreadyExists(err) {
229-
rr.ReconcileError = errutils.WithReason(fmt.Errorf("cluster '%s/%s' already exists, this is not supposed to happen", cluster.Namespace, cluster.Name), cconst.ReasonInternalError)
230-
return rr
231-
}
232-
rr.ReconcileError = errutils.WithReason(fmt.Errorf("error creating cluster '%s/%s': %w", cluster.Namespace, cluster.Name, err), cconst.ReasonPlatformClusterInteractionProblem)
233-
return rr
188+
// apply required changes to the cluster
189+
updated, err := sr.Apply(ctx, r.PlatformCluster.Client())
190+
if err != nil {
191+
rr.ReconcileError = errutils.WithReason(fmt.Errorf("error applying scheduling result for request '%s': %w", req.String(), err), cconst.ReasonPlatformClusterInteractionProblem)
192+
return rr
193+
}
194+
// identify the cluster that was chosen for this request
195+
for _, c := range updated {
196+
if slices.Contains(c.Finalizers, cr.FinalizerForCluster()) {
197+
cluster = c
198+
log.Debug("Request has been scheduled", "clusterName", cluster.Name, "clusterNamespace", cluster.Namespace, "preemptive", cr.Spec.Preemptive)
199+
break
234200
}
235201
}
202+
if cluster == nil {
203+
rr.ReconcileError = errutils.WithReason(fmt.Errorf("unable to determine cluster for request '%s', this should not happen", req.String()), cconst.ReasonSchedulingFailed)
204+
return rr
205+
}
236206

237207
// add cluster reference to request
238208
rr.Object.Status.Cluster = &clustersv1alpha1.NamespacedObjectReference{}
@@ -369,72 +339,6 @@ func (r *ClusterScheduler) fetchRelevantClusters(ctx context.Context, cr *cluste
369339
return clusters, nil
370340
}
371341

372-
// initializeNewCluster creates a new Cluster resource based on the given ClusterRequest and ClusterDefinition.
373-
func (r *ClusterScheduler) initializeNewCluster(ctx context.Context, cr *clustersv1alpha1.ClusterRequest, cDef *config.ClusterDefinition) *clustersv1alpha1.Cluster {
374-
log := logging.FromContextOrPanic(ctx)
375-
purpose := cr.Spec.Purpose
376-
cluster := &clustersv1alpha1.Cluster{}
377-
// choose a name for the cluster
378-
// priority as follows:
379-
// - for singleton clusters (shared unlimited):
380-
// 1. generateName of template
381-
// 2. name of template
382-
// 3. purpose
383-
// - for exclusive clusters or shared limited:
384-
// 1. generateName of template
385-
// 2. purpose used as generateName
386-
if cDef.IsSharedUnlimitedly() {
387-
// there will only be one instance of this cluster
388-
if cDef.Template.GenerateName != "" {
389-
cluster.SetGenerateName(cDef.Template.GenerateName)
390-
} else if cDef.Template.Name != "" {
391-
cluster.SetName(cDef.Template.Name)
392-
} else {
393-
cluster.SetName(purpose)
394-
}
395-
} else {
396-
// there might be multiple instances of this cluster
397-
if cDef.Template.GenerateName != "" {
398-
cluster.SetGenerateName(cDef.Template.GenerateName)
399-
} else {
400-
cluster.SetGenerateName(purpose + "-")
401-
}
402-
}
403-
// choose a namespace for the cluster
404-
// priority as follows:
405-
// 1. namespace of template
406-
// 2. namespace of request
407-
if cDef.Template.Namespace != "" {
408-
cluster.SetNamespace(cDef.Template.Namespace)
409-
} else {
410-
cluster.SetNamespace(cr.Namespace)
411-
}
412-
log.Info("Creating new cluster", "clusterName", cluster.Name, "clusterNamespace", cluster.Namespace, "preemptive", cr.Spec.Preemptive)
413-
414-
// set finalizer
415-
cluster.SetFinalizers([]string{cr.FinalizerForCluster()})
416-
// take over labels, annotations, and spec from the template
417-
cluster.SetLabels(cDef.Template.Labels)
418-
if err := ctrlutils.EnsureLabel(ctx, nil, cluster, clustersv1alpha1.DeleteWithoutRequestsLabel, "true", false); err != nil {
419-
if !ctrlutils.IsMetadataEntryAlreadyExistsError(err) {
420-
log.Error(err, "error setting label", "label", clustersv1alpha1.DeleteWithoutRequestsLabel, "value", "true")
421-
}
422-
}
423-
cluster.SetAnnotations(cDef.Template.Annotations)
424-
cluster.Spec = cDef.Template.Spec
425-
426-
// set purpose, if not set
427-
if len(cluster.Spec.Purposes) == 0 {
428-
cluster.Spec.Purposes = []string{purpose}
429-
} else {
430-
if !slices.Contains(cluster.Spec.Purposes, purpose) {
431-
cluster.Spec.Purposes = append(cluster.Spec.Purposes, purpose)
432-
}
433-
}
434-
435-
return cluster
436-
}
437-
438342
// TODO: move to controller-utils
439343
// RemoveFinalizerWithPrefix removes the first finalizer with the given prefix from the object.
440344
// The bool return value indicates whether a finalizer was removed.
@@ -460,10 +364,32 @@ func RemoveFinalizerWithPrefix(obj client.Object, prefix string) (string, bool)
460364
return suffix, length != index
461365
}
462366

367+
// TODO: move to controller-utils
368+
// GetAny returns an arbitrary key-value pair from the map as a pointer to a pairs.Pair.
369+
// If the map is empty, it returns nil.
370+
func GetAny[K comparable, V any](m map[K]V) *pairs.Pair[K, V] {
371+
for k, v := range m {
372+
return ptr.To(pairs.New(k, v))
373+
}
374+
return nil
375+
}
376+
377+
// TODO: move to controller-utils
378+
// MapValues returns a slice of all values in the map.
379+
// The order is unspecified.
380+
// The values are not deep-copied, so changes to the values could affect the original map.
381+
func MapValues[K comparable, V any](m map[K]V) []V {
382+
values := make([]V, 0, len(m))
383+
for _, v := range m {
384+
values = append(values, v)
385+
}
386+
return values
387+
}
388+
463389
// ReschedulePreemptiveRequest fetches the ClusterRequest with the corresponding UID from the cluster and adds a reconcile annotation to it.
464-
func (r *ClusterScheduler) ReschedulePreemptiveRequest(ctx context.Context, uid string) errutils.ReasonableError {
390+
func ReschedulePreemptiveRequest(ctx context.Context, platformClient client.Client, uid string) errutils.ReasonableError {
465391
crl := &clustersv1alpha1.ClusterRequestList{}
466-
if err := r.PlatformCluster.Client().List(ctx, crl, client.MatchingLabelsSelector{Selector: r.Config.Selectors.Requests.Completed()}, client.MatchingFields{"spec.preemptive": "true"}); err != nil {
392+
if err := platformClient.List(ctx, crl, client.MatchingFields{"spec.preemptive": "true"}); err != nil {
467393
return errutils.WithReason(fmt.Errorf("error listing ClusterRequests: %w", err), cconst.ReasonPlatformClusterInteractionProblem)
468394
}
469395
var cr *clustersv1alpha1.ClusterRequest
@@ -476,86 +402,8 @@ func (r *ClusterScheduler) ReschedulePreemptiveRequest(ctx context.Context, uid
476402
if cr == nil {
477403
return errutils.WithReason(fmt.Errorf("unable to find preemptive ClusterRequest with UID '%s'", uid), cconst.ReasonInternalError)
478404
}
479-
if err := ctrlutils.EnsureAnnotation(ctx, r.PlatformCluster.Client(), cr, apiconst.OperationAnnotation, apiconst.OperationAnnotationValueReconcile, true); err != nil {
405+
if err := ctrlutils.EnsureAnnotation(ctx, platformClient, cr, apiconst.OperationAnnotation, apiconst.OperationAnnotationValueReconcile, true); err != nil {
480406
return errutils.WithReason(fmt.Errorf("error adding reconcile annotation to preemptive ClusterRequest '%s': %w", cr.Name, err), cconst.ReasonPlatformClusterInteractionProblem)
481407
}
482408
return nil
483409
}
484-
485-
// pickFittingCluster gets a list of existing clusters that match the request's purpose and tries to pick one for the request.
486-
// If the returned cluster is nil, no fitting cluster was found. This can e.g. happen if all clusters are already at their tenancy limit.
487-
func (r *ClusterScheduler) pickFittingCluster(ctx context.Context, cr *clustersv1alpha1.ClusterRequest, clusters []*clustersv1alpha1.Cluster, cDef *config.ClusterDefinition) (*clustersv1alpha1.Cluster, errutils.ReasonableError) {
488-
log := logging.FromContextOrPanic(ctx)
489-
log.Debug("Checking for fitting clusters", "purpose", cr.Spec.Purpose, "tenancyCount", cDef.TenancyCount, "preemptive", cr.Spec.Preemptive)
490-
// remove all clusters with a non-zero deletion timestamp from the list of candidates
491-
clusters = filters.FilterSlice(clusters, func(args ...any) bool {
492-
c, ok := args[0].(*clustersv1alpha1.Cluster)
493-
if !ok {
494-
return false
495-
}
496-
return c.DeletionTimestamp.IsZero()
497-
})
498-
// avoid picking clusters which are not ready if ready ones are available
499-
readyClusters := filters.FilterSlice(clusters, func(args ...any) bool {
500-
c, ok := args[0].(*clustersv1alpha1.Cluster)
501-
if !ok {
502-
return false
503-
}
504-
return c.Status.Phase == clustersv1alpha1.CLUSTER_PHASE_READY
505-
})
506-
if len(readyClusters) > 0 {
507-
log.Debug("Excluding non-ready clusters from scheduling", "readyCount", len(readyClusters), "totalCount", len(clusters))
508-
clusters = readyClusters
509-
}
510-
// unless the cluster template for the requested purpose allows unlimited sharing, filter out all clusters that are already at their tenancy limit
511-
// for preemptive requests, other preemptive requests count towards the tenancy limit, for regular requests, preemptive requests are ignored
512-
if cDef.IsExclusive() || cDef.TenancyCount > 0 {
513-
clusters = filters.FilterSlice(clusters, func(args ...any) bool {
514-
c, ok := args[0].(*clustersv1alpha1.Cluster)
515-
if !ok {
516-
return false
517-
}
518-
preTenCount := c.GetPreemptiveTenancyCount()
519-
if cDef.IsExclusive() && preTenCount == 0 {
520-
// for exclusive tenancy, take only clusters into account that have been preemptively requested
521-
return false
522-
}
523-
tenCount := c.GetTenancyCount()
524-
if cr.Spec.Preemptive {
525-
// for preemptive requests, also take preemptive "workloads" into account
526-
tenCount += preTenCount
527-
}
528-
return tenCount < cDef.TenancyCount
529-
})
530-
}
531-
var cluster *clustersv1alpha1.Cluster
532-
if len(clusters) == 1 {
533-
cluster = clusters[0]
534-
log.Debug("One existing cluster qualifies for request", "clusterName", cluster.Name, "clusterNamespace", cluster.Namespace, "preemptive", cr.Spec.Preemptive)
535-
} else if len(clusters) > 0 {
536-
log.Debug("Multiple existing clusters qualify for request, choosing one according to strategy", "strategy", string(r.Config.Strategy), "count", len(clusters), "preemptive", cr.Spec.Preemptive)
537-
switch r.Config.Strategy {
538-
case config.STRATEGY_SIMPLE:
539-
cluster = clusters[0]
540-
case config.STRATEGY_RANDOM:
541-
cluster = clusters[rand.IntN(len(clusters))]
542-
case "":
543-
// default to balanced, if empty
544-
fallthrough
545-
case config.STRATEGY_BALANCED:
546-
// find cluster with least number of requests
547-
cluster = clusters[0]
548-
count := cluster.GetTenancyCount()
549-
for _, c := range clusters[1:] {
550-
tmp := c.GetTenancyCount()
551-
if tmp < count {
552-
count = tmp
553-
cluster = c
554-
}
555-
}
556-
default:
557-
return nil, errutils.WithReason(fmt.Errorf("unknown strategy '%s'", r.Config.Strategy), cconst.ReasonConfigurationProblem)
558-
}
559-
}
560-
return cluster, nil
561-
}

0 commit comments

Comments
 (0)