diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index a2d5259d..1d014b34 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -38,10 +38,11 @@ var ( FLOAT64_3 = 3.0 FLOAT64_10 = 10.0 - INT32_0 = int32(0) - INT32_10 = int32(10) - INT32_20 = int32(20) - INT32_40 = int32(40) + INT32_0 = int32(0) + INT32_10 = int32(10) + INT32_20 = int32(20) + INT32_40 = int32(40) + INT32_1000 = int32(1000) INT64_1 = int64(1) INT64_2 = int64(2) @@ -204,6 +205,7 @@ func TestLoadFileV1beta1(t *testing.T) { }, }, }, + ExpectedThroughput: &INT32_1000, PercentageOfNodesToScore: &INT32_0, IncreasedPercentageOfNodesToScore: &INT32_0, DisablePreemption: &FALSE, diff --git a/pkg/features/godel_features.go b/pkg/features/godel_features.go index 3f3cf9cb..0302e290 100644 --- a/pkg/features/godel_features.go +++ b/pkg/features/godel_features.go @@ -84,6 +84,12 @@ const ( // // Allows to trigger resource reservation in Godel. ResourceReservation featuregate.Feature = "ResourceReservation" + + // owner: @libing.binacs + // alpha: for now + // + // support skipping filtering unchanged nodes. + SkipFilteringUnchangedNodes featuregate.Feature = "SkipFilteringUnchangedNodes" ) func init() { @@ -104,4 +110,5 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS EnableColocation: {Default: false, PreRelease: featuregate.Alpha}, SupportRescheduling: {Default: false, PreRelease: featuregate.Alpha}, ResourceReservation: {Default: false, PreRelease: featuregate.Alpha}, + SkipFilteringUnchangedNodes: {Default: false, PreRelease: featuregate.Alpha}, } diff --git a/pkg/framework/api/common_state.go b/pkg/framework/api/common_state.go index 46b5f06e..fc83a189 100644 --- a/pkg/framework/api/common_state.go +++ b/pkg/framework/api/common_state.go @@ -30,6 +30,7 @@ const ( NodePartitionTypeStateKey = "NodePartitionType" PodLauncherStateKey = "PodLauncher" PodResourceTypeStateKey = "PodResourceType" + PodSchedulingCtxKey = "PodSchedulingCtx" PodTraceStateKey = "PodTrace" NodeGroupStateKey = "NodeGroup" PotentialVictimsKey = "PotentialVictims" @@ -45,6 +46,7 @@ const ( NodePartitionTypeMissedErrorString = "failed to get NodePartitionType, supposed to be set in cycle state" PodLauncherMissedErrorString = "failed to get PodLauncher, supposed to be set in cycle state" PodResourceTypeMissingErrorString = "failed to get PodResourceType, supposed to be set in cycle state" + PodSchedulingCtxMissingErrorString = "failed to get PodSchedulingCtx, supposed to be set in cycle state" PodTraceMissingErrorString = "failed to get PodTrace, supposed to be set in cycle state" NodeGroupMissedErrorString = "failed to get NodeGroup, supposed to be set in cycle state" @@ -91,6 +93,17 @@ func SetPodLauncherState(podLauncher podutil.PodLauncher, state *CycleState) err return podutil.PodLauncherUnsupportError } +func SetPodSchedulingCtxKey(schedulingCtx *PodSchedulingCtx, state *CycleState) error { + if schedulingCtx == nil { + schedulingCtx = &PodSchedulingCtx{} + } + data := &stateData{ + data: schedulingCtx, + } + state.Write(PodSchedulingCtxKey, data) + return nil +} + func SetPodTrace(podTrace tracing.SchedulingTrace, state *CycleState) error { data := &stateData{ data: podTrace, @@ -242,6 +255,17 @@ func GetPodLauncher(state *CycleState) (podutil.PodLauncher, error) { return "", PodLauncherMissedError } +var PodSchedulingCtxMissingError = fmt.Errorf(PodSchedulingCtxMissingErrorString) + +func GetPodSchedulingCtx(state *CycleState) (*PodSchedulingCtx, error) { + if data, err := state.Read(PodSchedulingCtxKey); err == nil { + if s, ok := data.(*stateData); ok { + return s.data.(*PodSchedulingCtx), nil + } + } + return nil, PodSchedulingCtxMissingError +} + var PodTraceMissingError = fmt.Errorf(PodTraceMissingErrorString) func GetPodTrace(state *CycleState) (tracing.SchedulingTrace, error) { diff --git a/pkg/framework/api/types.go b/pkg/framework/api/types.go index 3e6c2974..7c30f326 100644 --- a/pkg/framework/api/types.go +++ b/pkg/framework/api/types.go @@ -99,6 +99,10 @@ type ClusterEvent struct { // RecorderFactory builds an EventRecorder for a given scheduler name. type RecorderFactory func(string) events.EventRecorder +type PodSchedulingCtx struct { + NodeStoreGeneration int64 +} + // QueuedPodInfo is a Pod wrapper with additional information related to // the pod's status in the scheduling queue, such as the timestamp when // it's added to the queue. @@ -121,6 +125,8 @@ type QueuedPodInfo struct { // takes more time. InitialPreemptAttemptTimestamp time.Time + SchedulingCtx *PodSchedulingCtx + // Fields below are only used by binder, need to move this out of QueuedPodInfo and put them into BinderQueuedPodInfo // TODO: (liumeng) implement BinderQueuedPodInfo ? // assumedPod diff --git a/pkg/scheduler/apis/config/defaults.go b/pkg/scheduler/apis/config/defaults.go index 0a07cb4a..f0ea7a0e 100644 --- a/pkg/scheduler/apis/config/defaults.go +++ b/pkg/scheduler/apis/config/defaults.go @@ -150,6 +150,10 @@ func SetDefaults_GodelSchedulerConfiguration(obj *GodelSchedulerConfiguration) { // We got SubClusterName "" as default. obj.DefaultProfile = &GodelSchedulerProfile{} } + if obj.DefaultProfile.ExpectedThroughput == nil { + expectedThroughput := int32(DefaultExpectedThroughput) + obj.DefaultProfile.ExpectedThroughput = &expectedThroughput + } if obj.DefaultProfile.PercentageOfNodesToScore == nil { percentageOfNodesToScore := int32(DefaultPercentageOfNodesToScore) obj.DefaultProfile.PercentageOfNodesToScore = &percentageOfNodesToScore diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 1f96347f..78314975 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -62,6 +62,8 @@ const ( ) const ( + DefaultExpectedThroughput = 0 + // DefaultPercentageOfNodesToScore defines the percentage of nodes of all nodes // that once found feasible, the scheduler stops looking for more nodes. // A value of 0 means adaptive, meaning the scheduler figures out a proper default. @@ -203,6 +205,8 @@ type GodelSchedulerProfile struct { // for that preemption plugin. PreemptionPluginConfigs []PluginConfig + ExpectedThroughput *int32 + // TODO: reserve temporarily(godel). // PercentageOfNodesToScore is the percentage of all nodes that once found feasible // for running a pod, the scheduler stops its search for more feasible nodes in diff --git a/pkg/scheduler/apis/config/v1beta1/defaults.go b/pkg/scheduler/apis/config/v1beta1/defaults.go index 73282078..1fc17919 100644 --- a/pkg/scheduler/apis/config/v1beta1/defaults.go +++ b/pkg/scheduler/apis/config/v1beta1/defaults.go @@ -150,6 +150,10 @@ func SetDefaults_GodelSchedulerConfiguration(obj *GodelSchedulerConfiguration) { // We got SubClusterName "" as default. obj.DefaultProfile = &GodelSchedulerProfile{} } + if obj.DefaultProfile.ExpectedThroughput == nil { + expectedThroughput := int32(config.DefaultExpectedThroughput) + obj.DefaultProfile.ExpectedThroughput = &expectedThroughput + } if obj.DefaultProfile.PercentageOfNodesToScore == nil { percentageOfNodesToScore := int32(config.DefaultPercentageOfNodesToScore) obj.DefaultProfile.PercentageOfNodesToScore = &percentageOfNodesToScore diff --git a/pkg/scheduler/apis/config/v1beta1/types.go b/pkg/scheduler/apis/config/v1beta1/types.go index dfe9ac5e..b2328b76 100644 --- a/pkg/scheduler/apis/config/v1beta1/types.go +++ b/pkg/scheduler/apis/config/v1beta1/types.go @@ -161,6 +161,8 @@ type GodelSchedulerProfile struct { // for that preemption plugin. PreemptionPluginConfigs []config.PluginConfig `json:"preemptionPluginConfigs,omitempty"` + ExpectedThroughput *int32 `json:"expectedThroughput,omitempty"` + // TODO: reserve temporarily(godel). // PercentageOfNodesToScore is the percentage of all nodes that once found feasible // for running a pod, the scheduler stops its search for more feasible nodes in diff --git a/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go index 9b816f99..ba3b89aa 100644 --- a/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go @@ -146,6 +146,7 @@ func autoConvert_v1beta1_GodelSchedulerProfile_To_config_GodelSchedulerProfile(i out.BasePluginsForNM = (*config.Plugins)(unsafe.Pointer(in.BasePluginsForNM)) out.PluginConfigs = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PluginConfigs)) out.PreemptionPluginConfigs = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PreemptionPluginConfigs)) + out.ExpectedThroughput = (*int32)(unsafe.Pointer(in.ExpectedThroughput)) out.PercentageOfNodesToScore = (*int32)(unsafe.Pointer(in.PercentageOfNodesToScore)) out.IncreasedPercentageOfNodesToScore = (*int32)(unsafe.Pointer(in.IncreasedPercentageOfNodesToScore)) out.DisablePreemption = (*bool)(unsafe.Pointer(in.DisablePreemption)) @@ -171,6 +172,7 @@ func autoConvert_config_GodelSchedulerProfile_To_v1beta1_GodelSchedulerProfile(i out.BasePluginsForNM = (*config.Plugins)(unsafe.Pointer(in.BasePluginsForNM)) out.PluginConfigs = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PluginConfigs)) out.PreemptionPluginConfigs = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PreemptionPluginConfigs)) + out.ExpectedThroughput = (*int32)(unsafe.Pointer(in.ExpectedThroughput)) out.PercentageOfNodesToScore = (*int32)(unsafe.Pointer(in.PercentageOfNodesToScore)) out.IncreasedPercentageOfNodesToScore = (*int32)(unsafe.Pointer(in.IncreasedPercentageOfNodesToScore)) out.DisablePreemption = (*bool)(unsafe.Pointer(in.DisablePreemption)) diff --git a/pkg/scheduler/apis/config/v1beta1/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/v1beta1/zz_generated.deepcopy.go index 112325b1..4a497a26 100644 --- a/pkg/scheduler/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/v1beta1/zz_generated.deepcopy.go @@ -107,6 +107,11 @@ func (in *GodelSchedulerProfile) DeepCopyInto(out *GodelSchedulerProfile) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ExpectedThroughput != nil { + in, out := &in.ExpectedThroughput, &out.ExpectedThroughput + *out = new(int32) + **out = **in + } if in.PercentageOfNodesToScore != nil { in, out := &in.PercentageOfNodesToScore, &out.PercentageOfNodesToScore *out = new(int32) diff --git a/pkg/scheduler/apis/config/validation/validation.go b/pkg/scheduler/apis/config/validation/validation.go index 0c6f7382..587d8398 100644 --- a/pkg/scheduler/apis/config/validation/validation.go +++ b/pkg/scheduler/apis/config/validation/validation.go @@ -141,6 +141,10 @@ func ValidateSubClusterArgs(cc *config.GodelSchedulerProfile, fldPath *field.Pat errs = append(errs, ValidateBasePluginsConfiguration(cc.BasePluginsForNM, field.NewPath("baseNMPlugins"))...) errs = append(errs, ValidatePluginArgsConfiguration(cc.PluginConfigs, field.NewPath("pluginConfig"))...) + if cc.ExpectedThroughput != nil && *cc.ExpectedThroughput < 0 { + errs = append(errs, field.Invalid(field.NewPath("expectedThroughput"), + cc.ExpectedThroughput, "not in valid range [0-INF]")) + } if cc.PercentageOfNodesToScore != nil && (*cc.PercentageOfNodesToScore < 0 || *cc.PercentageOfNodesToScore > 100) { errs = append(errs, field.Invalid(field.NewPath("percentageOfNodesToScore"), cc.PercentageOfNodesToScore, "not in valid range [0-100]")) diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index 2aecfa3f..cc8f1646 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -107,6 +107,11 @@ func (in *GodelSchedulerProfile) DeepCopyInto(out *GodelSchedulerProfile) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ExpectedThroughput != nil { + in, out := &in.ExpectedThroughput, &out.ExpectedThroughput + *out = new(int32) + **out = **in + } if in.PercentageOfNodesToScore != nil { in, out := &in.PercentageOfNodesToScore, &out.PercentageOfNodesToScore *out = new(int32) diff --git a/pkg/scheduler/cache/snapshot.go b/pkg/scheduler/cache/snapshot.go index 60cf4c58..e1825283 100644 --- a/pkg/scheduler/cache/snapshot.go +++ b/pkg/scheduler/cache/snapshot.go @@ -24,6 +24,7 @@ import ( framework "github.com/kubewharf/godel-scheduler/pkg/framework/api" "github.com/kubewharf/godel-scheduler/pkg/scheduler/cache/commonstores" nodestore "github.com/kubewharf/godel-scheduler/pkg/scheduler/cache/commonstores/node_store" + "github.com/kubewharf/godel-scheduler/pkg/util/generationstore" ) // Snapshot is a snapshot of s NodeInfo and NodeTree order. The scheduler takes a @@ -62,6 +63,10 @@ func NewEmptySnapshot(handler commoncache.CacheHandler) *Snapshot { return s } +func (s *Snapshot) GetNodeStoreGeneration() int64 { + return s.CommonStoresSwitch.Find(nodestore.Name).(*nodestore.NodeStore).Store.(generationstore.RawStore).GetGeneration() +} + func (s *Snapshot) MakeBasicNodeGroup() framework.NodeGroup { nodeCircle := framework.NewNodeCircle(framework.DefaultNodeCircleName, s) nodeGroup := framework.NewNodeGroup(framework.DefaultNodeGroupName, s, []framework.NodeCircle{nodeCircle}) diff --git a/pkg/scheduler/core/pod_scheduler/pod_scheduler.go b/pkg/scheduler/core/pod_scheduler/pod_scheduler.go index 1d210b3f..e90c35cf 100644 --- a/pkg/scheduler/core/pod_scheduler/pod_scheduler.go +++ b/pkg/scheduler/core/pod_scheduler/pod_scheduler.go @@ -19,6 +19,7 @@ package podscheduler import ( "context" "fmt" + "math" "math/rand" "strconv" "sync" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -37,6 +39,7 @@ import ( godelclient "github.com/kubewharf/godel-scheduler-api/pkg/client/clientset/versioned" crdinformers "github.com/kubewharf/godel-scheduler-api/pkg/client/informers/externalversions" commonstore "github.com/kubewharf/godel-scheduler/pkg/common/store" + godelfeatures "github.com/kubewharf/godel-scheduler/pkg/features" framework "github.com/kubewharf/godel-scheduler/pkg/framework/api" "github.com/kubewharf/godel-scheduler/pkg/framework/api/config" schedulerconfig "github.com/kubewharf/godel-scheduler/pkg/scheduler/apis/config" @@ -44,16 +47,27 @@ import ( "github.com/kubewharf/godel-scheduler/pkg/scheduler/cache/isolatedcache" "github.com/kubewharf/godel-scheduler/pkg/scheduler/core" schedulerframework "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework" + preemptionplugins "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/preemption-plugins" "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/runtime" - "github.com/kubewharf/godel-scheduler/pkg/scheduler/util" + "github.com/kubewharf/godel-scheduler/pkg/scheduler/metrics" schedulerutil "github.com/kubewharf/godel-scheduler/pkg/scheduler/util" + "github.com/kubewharf/godel-scheduler/pkg/util" + "github.com/kubewharf/godel-scheduler/pkg/util/adaptiveattenuation" "github.com/kubewharf/godel-scheduler/pkg/util/constraints" "github.com/kubewharf/godel-scheduler/pkg/util/helper" + metricsutil "github.com/kubewharf/godel-scheduler/pkg/util/metrics" "github.com/kubewharf/godel-scheduler/pkg/util/parallelize" podutil "github.com/kubewharf/godel-scheduler/pkg/util/pod" "github.com/kubewharf/godel-scheduler/pkg/util/tracing" ) +// TODO: revisit this trick number. +// +// Assuming that 600 nodes can be filtered within 1ms, then 600000 nodes can be filtered within 1s. +const FilterOpPerSecond float64 = 600000 + +var localIgnoredNodeStatus = framework.NewStatus(framework.Unschedulable, "node(s) are ignored because of cached results") + // podScheduler is the component managing cache such as node and pod info, and other configs sharing the same life cycle with scheduler type podScheduler struct { switchType framework.SwitchType @@ -72,6 +86,7 @@ type podScheduler struct { disablePreemption bool candidateSelectPolicy string betterSelectPolicies []string + expectedThroughput int32 percentageOfNodesToScore int32 increasedPercentageOfNodesToScore int32 @@ -482,7 +497,25 @@ func (gs *podScheduler) findFeasibleNodes( errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex - var feasibleNodesLen int32 + var feasibleNodesLen, filteredNodesCount, filteredUnchangedNodes int32 + + var paradigm adaptiveattenuation.AdaptiveAttenuationParadigm + if gs.expectedThroughput > 0 { + paradigm = adaptiveattenuation.NewSquareParadigm(float64(numNodesToFind), FilterOpPerSecond/float64(gs.expectedThroughput), 1) + } else { + paradigm = adaptiveattenuation.NewDefaultParadigm(float64(numNodesToFind)) + } + + skipFilteringUnchangedNodes := gs.skipFilteringUnchangedNodesForPod(ctx, pod, f) + schedulingCtx, _ := framework.GetPodSchedulingCtx(state) + lastSchedulingNodeGeneration := schedulingCtx.NodeStoreGeneration + defer func() { + if lastSchedulingNodeGeneration > 0 { + metrics.ObservePodFilteredUnchangedNodesPercentage(gs.subCluster, metricsutil.SwitchTypeToQos(gs.switchType), gs.SchedulerName(), math.Min(float64(filteredUnchangedNodes)*100.0/float64(filteredNodesCount), 100.0)) + } + // etc... + }() + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -494,6 +527,19 @@ func (gs *podScheduler) findFeasibleNodes( var fit bool var status *framework.Status + if lastSchedulingNodeGeneration >= nodeInfo.GetGeneration() { + atomic.AddInt32(&filteredUnchangedNodes, 1) + if skipFilteringUnchangedNodes { + // TODO: handleFilterResult(nodeInfo, false, framework.NewStatus(framework.Unschedulable, "node(s) are ignored because of cached results")) + statusesLock.Lock() + if !status.IsSuccess() { + statuses[nodeInfo.GetNodeName()] = localIgnoredNodeStatus + } + statusesLock.Unlock() + return + } + } + // TODO: revisit this. // ATTENTION: Read only without modifying the original cachedStatuses. if cachedStatus, ok := cachedStatuses[nodeInfo.GetNodeName()]; ok && cachedStatus.IsSuccess() { @@ -511,10 +557,7 @@ func (gs *podScheduler) findFeasibleNodes( if fit { length := atomic.AddInt32(&feasibleNodesLen, 1) - if length > numNodesToFind { - cancel() - atomic.AddInt32(&feasibleNodesLen, -1) - } else { + if length <= numNodesToFind { feasibleNodes[length-1] = nodeInfo } } else { @@ -524,18 +567,26 @@ func (gs *podScheduler) findFeasibleNodes( } statusesLock.Unlock() } + + if y := paradigm.Predict(float64(atomic.AddInt32(&filteredNodesCount, 1))); atomic.LoadInt32(&feasibleNodesLen) >= int32(math.Ceil(y)) { + cancel() + } } // Stops searching for more nodes once the configured number of feasible nodes // are found. parallelize.Until(ctx, size, checkNode) - feasibleNodes = feasibleNodes[:feasibleNodesLen] + feasibleNodes = feasibleNodes[:util.MinInt32(feasibleNodesLen, numNodesToFind)] if err := errCh.ReceiveError(); err != nil { klog.ErrorS(err, "Failed to get feasible nodes", "pod", klog.KObj(pod)) return nil, err } + klog.InfoS("Finish findFeasibleNodes", "pod", podutil.GetPodKey(pod), + "numNodesToFind", numNodesToFind, "feasibleNodesLen", feasibleNodesLen, "filteredNodesCount", filteredNodesCount, + "paradigm", paradigm.Name(), "paradigmPredict", paradigm.Predict(float64(filteredNodesCount))) + return feasibleNodes, nil } @@ -579,6 +630,23 @@ func (gs *podScheduler) numFeasibleNodesToFind(numAllNodes int32, longRunningTas return expectedNodeCount } +func (gs *podScheduler) skipFilteringUnchangedNodesForPod(ctx context.Context, pod *v1.Pod, f framework.SchedulerFramework) bool { + // Do not skip nodes when the featuregate is not enabled. + if !utilfeature.DefaultFeatureGate.Enabled(godelfeatures.SkipFilteringUnchangedNodes) { + return false + } + + // Do not skip nodes when the pod has cross node constraints + // TODO: Fine-grained condition checking + if f.HasCrossNodesConstraints(ctx, pod) { + return false + } + if !gs.disablePreemption && preemptionplugins.PodEligibleToPreemptOthers(pod, gs.pcLister) { + return false + } + return true +} + // prioritizeNodes prioritizes the nodes by running the score plugins, // which return a score for each node from the call to RunScorePlugins(). // The scores from each plugin are added together to make the score for that node, then @@ -874,6 +942,7 @@ func NewPodScheduler( disablePreemption bool, candidateSelectPolicy string, betterSelectPolicies []string, + expectedThroughput int32, percentageOfNodesToScore int32, increasedPercentageOfNodesToScore int32, basePlugins framework.PluginCollectionSet, @@ -891,6 +960,7 @@ func NewPodScheduler( crdInformerFactory: crdInformerFactory, snapshot: snapshot, disablePreemption: disablePreemption, + expectedThroughput: expectedThroughput, percentageOfNodesToScore: percentageOfNodesToScore, increasedPercentageOfNodesToScore: increasedPercentageOfNodesToScore, basePlugins: basePlugins, @@ -924,7 +994,7 @@ func NewPodScheduler( gs.pcLister = informerFactory.Scheduling().V1().PriorityClasses().Lister() gs.pvcLister = informerFactory.Core().V1().PersistentVolumeClaims().Lister() orderedPluginRegistry := schedulerframework.NewOrderedPluginRegistry() - gs.pluginOrder = util.GetListIndex(orderedPluginRegistry) + gs.pluginOrder = schedulerutil.GetListIndex(orderedPluginRegistry) gs.betterSelectPoliciesRegistry = map[string]betterSelectPolicy{ schedulerconfig.BetterPreemptionPolicyAscending: gs.ascendingOrderPreemption, diff --git a/pkg/scheduler/core/pod_scheduler/pod_scheduler_test.go b/pkg/scheduler/core/pod_scheduler/pod_scheduler_test.go index fc9de581..de4fce23 100644 --- a/pkg/scheduler/core/pod_scheduler/pod_scheduler_test.go +++ b/pkg/scheduler/core/pod_scheduler/pod_scheduler_test.go @@ -424,6 +424,7 @@ func TestScheduleInSpecificNodeGroup(t *testing.T) { nodeGroup := snapshot.MakeBasicNodeGroup() state := framework.NewCycleState() framework.SetPodResourceTypeState(podutil.GuaranteedPod, state) + framework.SetPodSchedulingCtxKey(&framework.PodSchedulingCtx{}, state) framework.SetPodTrace(&tracing.NoopSchedulingTrace{}, state) if tt.schedulingStagesState != nil { constructCycleStateSkipSpecificStage(state, framework.SchedulingStagesState(*tt.schedulingStagesState)) diff --git a/pkg/scheduler/core/pod_scheduler/preemption.go b/pkg/scheduler/core/pod_scheduler/preemption.go index 21c68041..8ef5507f 100644 --- a/pkg/scheduler/core/pod_scheduler/preemption.go +++ b/pkg/scheduler/core/pod_scheduler/preemption.go @@ -48,6 +48,7 @@ import ( "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/plugins/volumebinding" preemption "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/preemption-plugins" preemptionplugins "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/preemption-plugins" + frameworkruntime "github.com/kubewharf/godel-scheduler/pkg/scheduler/framework/runtime" "github.com/kubewharf/godel-scheduler/pkg/scheduler/metrics" "github.com/kubewharf/godel-scheduler/pkg/util" diff --git a/pkg/scheduler/core/types.go b/pkg/scheduler/core/types.go index aca16081..dddf5944 100644 --- a/pkg/scheduler/core/types.go +++ b/pkg/scheduler/core/types.go @@ -64,7 +64,7 @@ type UnitScheduler interface { type SchedulerHooks interface { PodScheduler() PodScheduler EventRecorder() events.EventRecorder - BootstrapSchedulePod(ctx context.Context, pod *v1.Pod, podTrace tracing.SchedulingTrace, nodeGroup string) (string, framework.SchedulerFramework, framework.SchedulerPreemptionFramework, *framework.CycleState, error) + BootstrapSchedulePod(ctx context.Context, pod *v1.Pod, schedulingCtx *framework.PodSchedulingCtx, podTrace tracing.SchedulingTrace, nodeGroup string) (string, framework.SchedulerFramework, framework.SchedulerPreemptionFramework, *framework.CycleState, error) ReservePod(ctx context.Context, clonedPod *v1.Pod, scheduleResult PodScheduleResult) (string, error) } @@ -160,6 +160,7 @@ func TransferToUnitResult(unitInfo *SchedulingUnitInfo, details *interpretabity. type RunningUnitInfo struct { QueuedPodInfo *framework.QueuedPodInfo Trace tracing.SchedulingTrace + SchedulingCtx *framework.PodSchedulingCtx // clonedPod is used to store those changes to the original pods in the workflow // e.g. span initialization, reservation info, preemption info ... diff --git a/pkg/scheduler/core/unit_scheduler/unit_scheduler.go b/pkg/scheduler/core/unit_scheduler/unit_scheduler.go index 5bf44543..241762d2 100644 --- a/pkg/scheduler/core/unit_scheduler/unit_scheduler.go +++ b/pkg/scheduler/core/unit_scheduler/unit_scheduler.go @@ -171,7 +171,7 @@ func (gs *unitScheduler) EventRecorder() events.EventRecorder { return gs.Recorder } -func (gs *unitScheduler) BootstrapSchedulePod(ctx context.Context, pod *v1.Pod, podTrace tracing.SchedulingTrace, nodeGroup string) (string, framework.SchedulerFramework, framework.SchedulerPreemptionFramework, *framework.CycleState, error) { +func (gs *unitScheduler) BootstrapSchedulePod(ctx context.Context, pod *v1.Pod, schedulingCtx *framework.PodSchedulingCtx, podTrace tracing.SchedulingTrace, nodeGroup string) (string, framework.SchedulerFramework, framework.SchedulerPreemptionFramework, *framework.CycleState, error) { godelScheduler, switchType, subCluster := gs.Scheduler, gs.switchType, gs.subCluster if err := podPassesBasicChecks(pod, gs.pvcLister); err != nil { @@ -197,6 +197,11 @@ func (gs *unitScheduler) BootstrapSchedulePod(ctx context.Context, pod *v1.Pod, } state.SetRecordPluginMetrics(true) + if err = framework.SetPodSchedulingCtxKey(schedulingCtx, state); err != nil { + klog.ErrorS(err, "Fail to set pod schedulingCtx context map", "switchType", switchType, "subCluster", subCluster, "pod", podutil.GetPodKey(pod)) + return "", nil, nil, nil, err + } + if err = framework.SetPodTrace(podTrace, state); err != nil { klog.ErrorS(err, "Fail to set pod tracing context map", "switchType", switchType, "subCluster", subCluster, "pod", podutil.GetPodKey(pod)) return "", nil, nil, nil, err @@ -518,6 +523,7 @@ func (gs *unitScheduler) constructRunningUnitInfo(ctx context.Context, unit fram QueuedPodInfo: podInfo, ClonedPod: getAndInitClonedPod(podTrace.GetRootSpanContext(), podInfo), Trace: podTrace, + SchedulingCtx: podInfo.SchedulingCtx, } allMember++ } @@ -592,6 +598,7 @@ func (gs *unitScheduler) handleSchedulingUnitFailure(ctx context.Context, result } } // 2. refresh the pod info + nodeStoreGeneration := gs.Snapshot.GetNodeStoreGeneration() podInfos := unitInfo.QueuedUnitInfo.GetPods() for i := range podInfos { podInfo := podInfos[i] @@ -605,6 +612,7 @@ func (gs *unitScheduler) handleSchedulingUnitFailure(ctx context.Context, result } else { // refresh queue span podInfo.QueueSpan = tracing.NewSpanInfo(podInfo.GetPodProperty().ConvertToTracingTags()) + podInfo.SchedulingCtx = &framework.PodSchedulingCtx{NodeStoreGeneration: nodeStoreGeneration} if err == nil { podInfo.Pod = cachedPod.DeepCopy() } diff --git a/pkg/scheduler/core/unit_scheduler/unit_scheduler_test.go b/pkg/scheduler/core/unit_scheduler/unit_scheduler_test.go index 81c707d6..39d63867 100644 --- a/pkg/scheduler/core/unit_scheduler/unit_scheduler_test.go +++ b/pkg/scheduler/core/unit_scheduler/unit_scheduler_test.go @@ -1070,6 +1070,7 @@ func TestScheduleUnitInNodeGroup_SinglePod(t *testing.T) { tt.disablePreemption, config.CandidateSelectPolicyBest, []string{config.BetterPreemptionPolicyDichotomy}, + 0, 100, 100, basePlugins, @@ -2148,6 +2149,7 @@ func TestScheduleUnitInNodeGroup_PodGroup(t *testing.T) { false, config.CandidateSelectPolicyBest, []string{config.BetterPreemptionPolicyAscending}, + 0, 100, 100, basePlugins, @@ -4374,6 +4376,7 @@ func TestScheduleUnit_Rescheduling(t *testing.T) { disablePreemption, config.CandidateSelectPolicyBest, []string{config.BetterPreemptionPolicyAscending}, + 0, 100, 100, basePlugins, @@ -5154,6 +5157,7 @@ func TestScheduleUnit_ReschedulingWithPreemption(t *testing.T) { false, config.CandidateSelectPolicyBest, []string{config.BetterPreemptionPolicyAscending}, + 0, 100, 100, basePlugins, diff --git a/pkg/scheduler/framework/unit_runtime/unit_framework.go b/pkg/scheduler/framework/unit_runtime/unit_framework.go index 31445c0c..af3246a0 100644 --- a/pkg/scheduler/framework/unit_runtime/unit_framework.go +++ b/pkg/scheduler/framework/unit_runtime/unit_framework.go @@ -395,7 +395,7 @@ func (f *UnitFramework) scheduleOneUnitInstance(ctx context.Context, scheduledIn podTrace := runningUnitInfo.Trace scheduleTraceContext := podTrace.GetTraceContext(tracing.SchedulerSchedulePodSpan) - podKey, fwk, _, state, err := f.schedulerHooks.BootstrapSchedulePod(ctx, clonedPod, podTrace, nodeGroup.GetKey()) + podKey, fwk, _, state, err := f.schedulerHooks.BootstrapSchedulePod(ctx, clonedPod, runningUnitInfo.SchedulingCtx, podTrace, nodeGroup.GetKey()) if err != nil { errMessage := fmt.Sprintf("Failed to initialize pod: %v in node group: %v", err.Error(), nodeGroup.GetKey()) f.schedulerHooks.EventRecorder().Eventf(clonedPod, nil, v1.EventTypeWarning, "FailToInitializePod", core.ReturnAction, helper.TruncateMessage(errMessage)) @@ -510,7 +510,7 @@ func (f *UnitFramework) preemptOneUnitInstance(ctx context.Context, scheduledInd podTrace := runningUnitInfo.Trace preemptionTraceContext := podTrace.GetTraceContext(tracing.SchedulerPreemptPodSpan) - podKey, fwk, pfwk, state, err := f.schedulerHooks.BootstrapSchedulePod(ctx, clonedPod, podTrace, nodeGroup.GetKey()) + podKey, fwk, pfwk, state, err := f.schedulerHooks.BootstrapSchedulePod(ctx, clonedPod, runningUnitInfo.SchedulingCtx, podTrace, nodeGroup.GetKey()) if err != nil { errMessage := fmt.Sprintf("Failed to initialize pod: %v in node group: %v", err.Error(), nodeGroup.GetKey()) f.schedulerHooks.EventRecorder().Eventf(clonedPod, nil, v1.EventTypeWarning, "FailToInitializePod", core.ReturnAction, helper.TruncateMessage(errMessage)) diff --git a/pkg/scheduler/metrics/pod_metrics.go b/pkg/scheduler/metrics/pod_metrics.go index 7b3d2739..93842554 100644 --- a/pkg/scheduler/metrics/pod_metrics.go +++ b/pkg/scheduler/metrics/pod_metrics.go @@ -130,6 +130,15 @@ var ( StabilityLevel: metrics.ALPHA, }, []string{pkgmetrics.QosLabel, pkgmetrics.SubClusterLabel, pkgmetrics.SchedulerLabel}) + schedulingPodFilteredUnchangedNodesPercentage = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "filteredunchangednodes_percentage", + Help: "Percentage of unchanged nodes when pod filtering", + Buckets: metrics.LinearBuckets(0, 5, 20), + StabilityLevel: metrics.ALPHA, + }, []string{pkgmetrics.QosLabel, pkgmetrics.SubClusterLabel, pkgmetrics.SchedulerLabel}) + schedulingAlgorithmDuration = metrics.NewHistogramVec( &metrics.HistogramOpts{ Subsystem: SchedulerSubsystem, @@ -469,6 +478,18 @@ func SchedulingUpdateSnapshotDurationObserve(basicLabels metrics.Labels, duratio newSchedulingUpdateSnapshotDurationObserverMetric(basicLabels).Observe(duration) } +// newSchedulingPodFilteredUnchangedNodesPercentageMetric returns the ObserverMetric for given labels by SchedulingUpdateStorePercentage +func newSchedulingPodFilteredUnchangedNodesPercentageMetric(labels metrics.Labels) metrics.ObserverMetric { + setScheduler(labels) + return schedulingPodFilteredUnchangedNodesPercentage.With(labels) +} + +// SchedulingPodFilteredUnchangedNodesPercentage Invoke Observe method +// basicLabels contains basic object property labels +func SchedulingPodFilteredUnchangedNodesPercentage(basicLabels metrics.Labels, percentage float64) { + newSchedulingPodFilteredUnchangedNodesPercentageMetric(basicLabels).Observe(percentage) +} + // newSchedulingAlgorithmDurationObserverMetric returns the ObserverMetric for given labels by SchedulingAlgorithmDuration func newSchedulingAlgorithmDurationObserverMetric(labels metrics.Labels) metrics.ObserverMetric { setScheduler(labels) diff --git a/pkg/scheduler/metrics/profile_metrics.go b/pkg/scheduler/metrics/profile_metrics.go index 5fb8ba50..1d0cfb36 100644 --- a/pkg/scheduler/metrics/profile_metrics.go +++ b/pkg/scheduler/metrics/profile_metrics.go @@ -114,6 +114,14 @@ func ObserveUpdateSnapshotAttemptAndLatency(subCluster, qos, scheduler string, d }, duration) } +func ObservePodFilteredUnchangedNodesPercentage(subCluster, qos, scheduler string, percentage float64) { + SchedulingPodFilteredUnchangedNodesPercentage(k8smetrics.Labels{ + metrics.SubClusterLabel: subCluster, + metrics.QosLabel: qos, + metrics.SchedulerLabel: scheduler, + }, percentage) +} + func ObservePodEvaluatedNodes(subCluster, qos, scheduler string, count float64) { podEvaluatedNodes.With(k8smetrics.Labels{ metrics.SubClusterLabel: subCluster, diff --git a/pkg/scheduler/metrics/register.go b/pkg/scheduler/metrics/register.go index 1b831129..ab8a3178 100644 --- a/pkg/scheduler/metrics/register.go +++ b/pkg/scheduler/metrics/register.go @@ -37,6 +37,7 @@ var metricsList = []metrics.Registerable{ queueSortingLatency, podSchedulingStageDuration, schedulingUpdateSnapshotDuration, + schedulingPodFilteredUnchangedNodesPercentage, schedulingAlgorithmDuration, preemptingEvaluationDuration, preemptingEvaluationQuantile, diff --git a/pkg/scheduler/options.go b/pkg/scheduler/options.go index ee9c33ab..ff50db89 100644 --- a/pkg/scheduler/options.go +++ b/pkg/scheduler/options.go @@ -80,6 +80,7 @@ func renderOptions(opts ...Option) schedulerOptions { } type subClusterConfig struct { + ExpectedThroughput int32 PercentageOfNodesToScore int32 IncreasedPercentageOfNodesToScore int32 @@ -106,6 +107,9 @@ func (c *subClusterConfig) complete(profile *config.GodelSchedulerProfile) { if profile == nil { return } + if profile.ExpectedThroughput != nil { + c.ExpectedThroughput = *profile.ExpectedThroughput + } if profile.PercentageOfNodesToScore != nil { c.PercentageOfNodesToScore = *profile.PercentageOfNodesToScore } @@ -165,6 +169,7 @@ func (c *subClusterConfig) Equal(other *subClusterConfig) bool { func newDefaultSubClusterConfig(profile *config.GodelSchedulerProfile) *subClusterConfig { c := &subClusterConfig{ + ExpectedThroughput: config.DefaultExpectedThroughput, PercentageOfNodesToScore: config.DefaultPercentageOfNodesToScore, IncreasedPercentageOfNodesToScore: config.DefaultIncreasedPercentageOfNodesToScore, @@ -195,6 +200,7 @@ func newDefaultSubClusterConfig(profile *config.GodelSchedulerProfile) *subClust func newSubClusterConfigFromDefaultConfig(profile *config.GodelSchedulerProfile, defaultConfig *subClusterConfig) *subClusterConfig { c := &subClusterConfig{ + ExpectedThroughput: defaultConfig.ExpectedThroughput, PercentageOfNodesToScore: defaultConfig.PercentageOfNodesToScore, IncreasedPercentageOfNodesToScore: defaultConfig.IncreasedPercentageOfNodesToScore, diff --git a/pkg/scheduler/options_test.go b/pkg/scheduler/options_test.go index 7a0a8184..a293fd97 100644 --- a/pkg/scheduler/options_test.go +++ b/pkg/scheduler/options_test.go @@ -196,6 +196,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { // DefaultProfile { expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 0, MaxWaitingDeletionDuration: 120, @@ -230,6 +231,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { name := "subCluster priorityqueue" subClusterProfile := newSubClusterConfigFromDefaultConfig(getSubClusterProfile(cfg.ComponentConfig, name), defaultProfile) expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 0, MaxWaitingDeletionDuration: 120, @@ -264,6 +266,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { name := "subCluster blockqueue" subClusterProfile := newSubClusterConfigFromDefaultConfig(getSubClusterProfile(cfg.ComponentConfig, name), defaultProfile) expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 0, MaxWaitingDeletionDuration: 120, @@ -296,6 +299,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { name := "subCluster different percentageOfNodesToScore & unitInitialBackoffSeconds & unitMaxBackoffSeconds" subClusterProfile := newSubClusterConfigFromDefaultConfig(getSubClusterProfile(cfg.ComponentConfig, name), defaultProfile) expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 20, IncreasedPercentageOfNodesToScore: 40, @@ -389,6 +393,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { name := "subCluster 1" subClusterProfile := newSubClusterConfigFromDefaultConfig(getSubClusterProfile(cfg.ComponentConfig, name), defaultProfile) expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 0, MaxWaitingDeletionDuration: 300, @@ -416,6 +421,7 @@ func TestLoadAndRenderFileV1beta1(t *testing.T) { // Finally revisit DefaultProfile again, it should NOT be affected by the configuration rendering of the sub-cluster. { expectedProfile := &subClusterConfig{ + ExpectedThroughput: 1000, PercentageOfNodesToScore: 0, MaxWaitingDeletionDuration: 120, diff --git a/pkg/scheduler/queue/util.go b/pkg/scheduler/queue/util.go index ba14ae89..770d74f6 100644 --- a/pkg/scheduler/queue/util.go +++ b/pkg/scheduler/queue/util.go @@ -41,6 +41,7 @@ func newQueuedPodInfo(pod *v1.Pod, clock util.Clock) *framework.QueuedPodInfo { InitialAttemptTimestamp: now, QueueSpan: tracing.NewSpanInfo(framework.ExtractPodProperty(pod).ConvertToTracingTags()), OwnerReferenceKey: podutil.GetPodTemplateKey(pod), + // SchedulingCtx: &framework.PodSchedulingCtx{}, // TODO: Unit Level SchedulingCtx } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a2545a66..5647f66f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -269,6 +269,7 @@ func (sched *Scheduler) createDataSet(idx int, subCluster string, switchType fra subClusterConfig.DisablePreemption, subClusterConfig.CandidatesSelectPolicy, subClusterConfig.BetterSelectPolicies, + subClusterConfig.ExpectedThroughput, subClusterConfig.PercentageOfNodesToScore, subClusterConfig.IncreasedPercentageOfNodesToScore, subClusterConfig.BasePlugins, diff --git a/pkg/util/adaptiveattenuation/adaptiveattenuation.go b/pkg/util/adaptiveattenuation/adaptiveattenuation.go new file mode 100644 index 00000000..68363bd3 --- /dev/null +++ b/pkg/util/adaptiveattenuation/adaptiveattenuation.go @@ -0,0 +1,57 @@ +package adaptiveattenuation + +type AdaptiveAttenuationParadigm interface { + Name() string + Predict(float64) float64 +} + +// ------------------------------------- DefaultAdaptiveAttenuationParadigm ------------------------------------- + +type DefaultAdaptiveAttenuationParadigm struct { + B float64 +} + +var _ AdaptiveAttenuationParadigm = &DefaultAdaptiveAttenuationParadigm{} + +func NewDefaultParadigm(b float64) AdaptiveAttenuationParadigm { + return &DefaultAdaptiveAttenuationParadigm{b} +} + +func (p *DefaultAdaptiveAttenuationParadigm) Name() string { + return "default" +} + +func (p *DefaultAdaptiveAttenuationParadigm) Predict(x float64) float64 { + return p.B +} + +// ------------------------------------- SquareAdaptiveAttenuationParadigm ------------------------------------- + +// Y = -1 * A * X^2 + B +type SquareAdaptiveAttenuationParadigm struct { + A, B, x1, y1 float64 +} + +var _ AdaptiveAttenuationParadigm = &SquareAdaptiveAttenuationParadigm{} + +func NewSquareParadigm(b, x1, y1 float64) AdaptiveAttenuationParadigm { + var a float64 + if b <= y1 { + a = 0 + y1 = b // ATTENTION + } else { + a = (b - y1) / (x1 * x1) + } + return &SquareAdaptiveAttenuationParadigm{a, b, x1, y1} +} + +func (p *SquareAdaptiveAttenuationParadigm) Name() string { + return "square" +} + +func (p *SquareAdaptiveAttenuationParadigm) Predict(x float64) float64 { + if x >= p.x1 { + return p.y1 + } + return -1*p.A*x*x + p.B +} diff --git a/pkg/util/adaptiveattenuation/adaptiveattenuation_test.go b/pkg/util/adaptiveattenuation/adaptiveattenuation_test.go new file mode 100644 index 00000000..b9b98a9b --- /dev/null +++ b/pkg/util/adaptiveattenuation/adaptiveattenuation_test.go @@ -0,0 +1,115 @@ +package adaptiveattenuation + +import ( + "reflect" + "testing" +) + +func TestDefaultAdaptiveAttenuationParadigm_Predict(t *testing.T) { + type fields struct { + B float64 + } + tests := []struct { + name string + fields fields + args []float64 + want []float64 + }{ + { + name: "B=51", + fields: fields{51}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + }, + { + name: "B=11", + fields: fields{11}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + }, + { + name: "B=2001", + fields: fields{2001}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + }, + { + name: "B=2001", + fields: fields{2001}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + }, + { + name: "B=1001", + fields: fields{1001}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewDefaultParadigm(tt.fields.B) + + for _, x := range tt.args { + got := p.Predict(x) + if got != tt.fields.B { + t.Errorf("AdaptiveAttenuationParadigm.Predict(%v) = %v, want %v", x, got, tt.want) + } + } + }) + } +} + +func TestSquareAdaptiveAttenuationParadigm_Predict(t *testing.T) { + type fields struct { + B float64 + x1 float64 + y1 float64 + } + tests := []struct { + name string + fields fields + args []float64 + want []float64 + }{ + { + name: "B=51, x1=500, y1=1", + fields: fields{51, 500, 1}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + want: []float64{50.98, 49, 33, 1, 1, 1, 1}, + }, + { + name: "B=11, x1=1000, y1=1", + fields: fields{11, 1000, 1}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + want: []float64{10.999, 10.9, 10.1, 8.5, 1, 1, 1}, + }, + { + name: "B=2001, x1=500, y1=1", + fields: fields{2001, 500, 1}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + want: []float64{2000.2, 1921, 1281, 1, 1, 1, 1}, + }, + { + name: "B=2001, x1=500, y1=2011", + fields: fields{2001, 500, 2011}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + want: []float64{2001, 2001, 2001, 2001, 2001, 2001, 2001}, + }, + { + name: "B=1001, x1=500, y1=2011", + fields: fields{1001, 500, 2011}, + args: []float64{10, 100, 300, 500, 1000, 5000, 10000}, + want: []float64{1001, 1001, 1001, 1001, 1001, 1001, 1001}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewSquareParadigm(tt.fields.B, tt.fields.x1, tt.fields.y1) + + got := []float64{} + for _, x := range tt.args { + got = append(got, p.Predict(x)) + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("AdaptiveAttenuationParadigm.Predict() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/util/utils.go b/pkg/util/utils.go index dcf2a7d9..7ce1af9a 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -489,3 +489,10 @@ func RemoveLabels(labels1, labels2 map[string]string) map[string]string { } return labels } + +func MinInt32(a, b int32) int32 { + if a > b { + return b + } + return a +} diff --git a/test/static/scheduler_config_v1beta1.yaml b/test/static/scheduler_config_v1beta1.yaml index f4e3d431..cb14ac4a 100644 --- a/test/static/scheduler_config_v1beta1.yaml +++ b/test/static/scheduler_config_v1beta1.yaml @@ -27,6 +27,7 @@ defaultProfile: attemptImpactFactorOnPriority: 3.0 # This should be 10 by default disablePreemption: false # This should be true by default blockQueue: false + expectedThroughput: 1000 # This should be 0 by default percentageOfNodesToScore: 0 increasedPercentageOfNodesToScore: 0 unitInitialBackoffSeconds: 1 # This should be 10 by default