diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 7c915be2c..27b938ce5 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/podstate" "sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration" "sigs.k8s.io/scheduler-plugins/pkg/qos" + "sigs.k8s.io/scheduler-plugins/pkg/resourcepolicy" "sigs.k8s.io/scheduler-plugins/pkg/sysched" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment" @@ -60,6 +61,7 @@ func main() { app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New), app.WithPlugin(sysched.Name, sysched.New), app.WithPlugin(peaks.Name, peaks.New), + app.WithPlugin(resourcepolicy.Name, resourcepolicy.New), // Sample plugins below. // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New), app.WithPlugin(podstate.Name, podstate.New), diff --git a/go.mod b/go.mod index b7aac3395..c9533a858 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,13 @@ module sigs.k8s.io/scheduler-plugins go 1.23.0 require ( + github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94 github.com/containers/common v0.46.0 github.com/diktyo-io/appgroup-api v1.0.1-alpha github.com/diktyo-io/networktopology-api v1.0.1-alpha github.com/dustin/go-humanize v1.0.1 github.com/go-logr/logr v1.4.2 - github.com/google/go-cmp v0.6.0 + github.com/google/go-cmp v0.7.0 github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.2 github.com/k8stopologyawareschedwg/podfingerprint v0.2.2 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -16,11 +17,11 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 gonum.org/v1/gonum v0.12.0 - k8s.io/api v0.32.5 - k8s.io/apimachinery v0.32.5 + k8s.io/api v0.33.1 + k8s.io/apimachinery v0.33.1 k8s.io/apiserver v0.32.5 - k8s.io/client-go v0.32.5 - k8s.io/code-generator v0.32.5 + k8s.io/client-go v0.33.1 + k8s.io/code-generator v0.33.0 k8s.io/component-base v0.32.5 k8s.io/component-helpers v0.32.5 k8s.io/klog/v2 v2.130.1 @@ -30,7 +31,7 @@ require ( sigs.k8s.io/controller-runtime v0.20.4 sigs.k8s.io/logtools v0.9.0 sigs.k8s.io/security-profiles-operator v0.4.0 - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 + sigs.k8s.io/structured-merge-diff/v4 v4.6.0 sigs.k8s.io/yaml v1.4.0 ) @@ -65,7 +66,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/cel-go v0.22.0 // indirect - github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gnostic-models v0.6.9 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -107,22 +108,22 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.31.0 // indirect + golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.27.0 // indirect - golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.7.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/oauth2 v0.27.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/term v0.30.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.26.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240826202546-f6391c0de4c7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 // indirect google.golang.org/grpc v1.65.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect @@ -133,13 +134,14 @@ require ( k8s.io/controller-manager v0.32.5 // indirect k8s.io/csi-translation-lib v0.32.5 // indirect k8s.io/dynamic-resource-allocation v0.32.5 // indirect - k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 // indirect + k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 // indirect k8s.io/kms v0.32.5 // indirect - k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect + k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect k8s.io/kubelet v0.32.5 // indirect k8s.io/metrics v0.32.5 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/release-utils v0.3.0 // indirect ) @@ -165,10 +167,8 @@ replace ( k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.32.5 k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.32.5 k8s.io/kube-proxy => k8s.io/kube-proxy v0.32.5 - k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.32.5 k8s.io/kubectl => k8s.io/kubectl v0.32.5 k8s.io/kubelet => k8s.io/kubelet v0.32.5 - k8s.io/kubernetes => k8s.io/kubernetes v1.32.5 k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.31.8 k8s.io/metrics => k8s.io/metrics v0.32.5 k8s.io/mount-utils => k8s.io/mount-utils v0.32.5 @@ -181,3 +181,7 @@ replace k8s.io/externaljwt => k8s.io/externaljwt v0.32.5 replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.32.5 replace k8s.io/sample-controller => k8s.io/sample-controller v0.32.5 + +replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.32.5 + +replace k8s.io/kubernetes => k8s.io/kubernetes v1.32.5 diff --git a/go.sum b/go.sum index 32857b94e..d876c6237 100644 --- a/go.sum +++ b/go.sum @@ -1342,6 +1342,8 @@ github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94 h1:XQQCH4Wf2n627vWxr7+uIKuHwzlQ5MFLZN/tD+jzwU4= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94/go.mod h1:kPff4zxO6pT4b/nJQYmMM2giPbVmi6FiE5pQTS8Ux34= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= @@ -1914,8 +1916,9 @@ github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= +github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -1931,8 +1934,9 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-github/v33 v33.0.0/go.mod h1:GMdDnVZY/2TsWgp/lkYnpSAh6TrzhANBBwm6k6TTEXg= github.com/google/go-intervals v0.0.2/go.mod h1:MkaR3LNRfeKLPmqgJYs4E66z5InYjmCjbbr4TQlcT6Y= @@ -2848,8 +2852,8 @@ golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5D golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -3029,8 +3033,8 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -3077,8 +3081,9 @@ golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5H golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= +golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -3103,8 +3108,8 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -3266,8 +3271,8 @@ golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3296,8 +3301,8 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= +golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -3323,8 +3328,8 @@ golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -3336,8 +3341,9 @@ golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -3894,8 +3900,9 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -3987,8 +3994,9 @@ k8s.io/endpointslice v0.32.5/go.mod h1:jDpGlovH8wjHNibzlMon738wvBIZxw7gS6ngyjvbY k8s.io/externaljwt v0.32.5/go.mod h1:P9TZ/u+o3CG//KNc/2HJmKgnuvawWS75IosS9dlGlxI= k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo/v2 v2.0.0-20240826214909-a7b603a56eb7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= -k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 h1:si3PfKm8dDYxgfbeA6orqrtLkvvIeH8UqffFJDl0bz4= k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= +k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 h1:2OX19X59HxDprNCVrWi6jb7LW1PoqTlYqEq5H2oetog= +k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= @@ -4003,8 +4011,9 @@ k8s.io/kms v0.32.5/go.mod h1:Bk2evz/Yvk0oVrvm4MvZbgq8BD34Ksxs2SRHn4/UiOM= k8s.io/kube-aggregator v0.32.5/go.mod h1:YQfS6iOH6WsWDFyqbNI/J7M5Vf0cWqjaKLH9zGgYCuI= k8s.io/kube-controller-manager v0.32.5/go.mod h1:lXUDCoumjKXosoHGf9F1q8Qw2TPzteNVXqcro2NV5Is= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= -k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= k8s.io/kube-proxy v0.32.5/go.mod h1:1FhoUGg/xu7FcxF7MWLpFzHP5si0gaRiF1SH8fg+L8E= k8s.io/kube-scheduler v0.32.5 h1:sPlell9o0IAD12PanESrWTUcZeUaz9lQMBzNZzZ66/8= k8s.io/kube-scheduler v0.32.5/go.mod h1:jTl+IZEujDywfLvcyHC6tOVQVz8vRct/vtoqikqsBms= @@ -4113,6 +4122,9 @@ sigs.k8s.io/kustomize/kyaml v0.18.1/go.mod h1:C3L2BFVU1jgcddNBE1TxuVLgS46TjObMwW sigs.k8s.io/logtools v0.9.0 h1:IMP/HTDkfM6rg6os/tcEjmQeIHyOyu/enduM/cOPGNQ= sigs.k8s.io/logtools v0.9.0/go.mod h1:74Z5BP7ehrMHi/Q31W1gSf8YgwT/4GPjVH5xPSPeZA0= sigs.k8s.io/mdtoc v1.1.0/go.mod h1:QZLVEdHH2iNIR4uHAZyvFRtjloHgVItk8lo/mzCtq3w= +sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/release-utils v0.2.0/go.mod h1:9O5livl2h3Q56jUkoZ7UnV22XVRB6MuD4l/51C2vAPg= sigs.k8s.io/release-utils v0.3.0 h1:cyNeXvm+2lPn67f4MWmq9xapZDAI5hekpT7iQPRxta4= sigs.k8s.io/release-utils v0.3.0/go.mod h1:J9xpziRNRI4mAeMZxPRryDodQMoMudMu6yC1aViFHU4= @@ -4120,8 +4132,9 @@ sigs.k8s.io/security-profiles-operator v0.4.0 h1:TwSTQqHYngd9EZFskV3M+sTnyj6aExA sigs.k8s.io/security-profiles-operator v0.4.0/go.mod h1:aqtxq1T5+UWQpFEsfGCiUnY4p+s2KZoqQMETkBDlrrc= sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/kep/594-resourcepolicy/README.md b/kep/594-resourcepolicy/README.md new file mode 100644 index 000000000..531f0ab94 --- /dev/null +++ b/kep/594-resourcepolicy/README.md @@ -0,0 +1,177 @@ +# Resource Policy + +## Table of Contents + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Use Cases](#use-cases) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [API](#api) + - [Implementation Details](#implementation-details) + - [PreFilter](#prefilter) + - [Filter](#filter) + - [Score](#score) + - [PreBind](#prebind) +- [Known limitations](#known-limitations) +- [Test plans](#test-plans) +- [Graduation criteria](#graduation-criteria) +- [Feature enablement and rollback](#feature-enablement-and-rollback) + + +## Summary +This proposal introduces a plugin that enables users to set priorities for various resources and define maximum resource consumption limits for workloads across different resources. + +## Motivation +A Kubernetes cluster typically consists of heterogeneous machines, with varying SKUs on CPU, memory, GPU, and pricing. To +efficiently utilize the different resources available in the cluster, users can set priorities for machines of different +types and configure resource allocations for different workloads. Additionally, they may choose to delete pods running +on low priority nodes instead of high priority ones. + +### Use Cases + +1. As a administrator of kubernetes cluster, there are some static but expensive VM instances and some dynamic but cheaper Spot +instances in my cluster. I hope to restrict the resource consumption on each kind of resource for different workloads to limit the cost. +I hope that important workloads in my cluster can be deployed first on static VM instances so that they will not worry about been preempted. And during business peak periods, the Pods that are scaled up are deployed on cheap, spot instances. At the end of the business peak, the Pods on Spot +instances are prioritized to be scaled down. + +### Goals + +1. Develop a filter plugin to restrict the resource consumption on each kind of resource for different workloads. +2. Develop a score plugin to favor nodes matched by a high priority kind of resource. +3. Automatically setting deletion costs on Pods to control the scaling in sequence of workloads through a controller. + +### Non-Goals + +1. Scheduler will not delete the pods. + +## Proposal + +### API +```yaml +apiVersion: scheduling.sigs.x-k8s.io/v1alpha1 +kind: ResourcePolicy +metadata: + name: xxx + namespace: xxx +spec: + matchLabelKeys: + - pod-template-hash + podSelector: + key1: value1 + strategy: prefer + units: + - name: unit1 + max: 10 + maxResource: + cpu: 10 + nodeSelector: + key1: value1 +``` + +```go +type ResourcePolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ResourcePolicySpec `json:"spec"` + Status ResourcePolicyStatus `json:"status,omitempty"` +} + +type ResourcePolicySpec struct { + // +optional + // +nullable + // +listType=atomic + Units []Unit `json:"units,omitempty" protobuf:"bytes,1,rep,name=units"` + + Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"` + MatchLabelKeys []string `json:"matchLabelKeys,omitempty" protobuf:"bytes,3,rep,name=matchLabelKeys"` +} + +type Unit struct { + Max *int32 `json:"max,omitempty" protobuf:"varint,1,opt,name=max"` + MaxResources v1.ResourceList `json:"maxResources,omitempty" protobuf:"bytes,2,rep,name=maxResources"` + + NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,3,rep,name=nodeSelector"` + + PodLabelsToAdd map[string]string `json:"podLabels,omitempty" protobuf:"bytes,4,rep,name=podLabels"` + PodAnnotationsToAdd map[string]string `json:"podAnnotations,omitempty" protobuf:"bytes,5,rep,name=podAnnotations"` +} + +type ResourcePolicyStatus struct { + Pods []int64 `json:"pods,omitempty"` + LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` +} +``` + +Pods will be matched by the ResourcePolicy in same namespace when the `.spec.podSelector`. +ResourcePolicies will never match pods in different namesapces. One pod can not be matched by more than one Resource Policies. + +Pods can only be scheduled on units defined in `.spec.units`. Each item in `.spec.units` contains a set of nodes that match the `NodeSelector` which describes a kind of resource in the cluster. + +Pods will be scheduled in the order defined by the `.spec.units`. +`.spec.units[].max` define the maximum number of pods that can be scheduled on each unit. If `.spec.units[].max` is not set, pods can always be scheduled on the units except there is no enough resource. +`.spec.units[].maxResource` define the maximum resource that can be scheduled on each unit. If `.spec.units[].maxResource` is not set, pods can always be scheduled on the units except there is no enough resource. + +`.spec.matchLabelKeys` indicate how we group the pods matched by `podSelector`, its behavior is like +`.spec.matchLabelKeys` in `PodTopologySpread`. + +### Implementation Details + +#### PreFilter +PreFilter check if the current pods match only one resource policy. If not, PreFilter will reject the pod. +If yes, PreFilter will get the number of pods on each unit to determine which units are available for the pod +and write this information into cycleState. + +#### Filter +Filter check if the node belongs to an available unit. If the node doesn't belong to any unit, we will return unschedulable. + +Besides, filter will check if the pods that was scheduled on the unit has already violated the quantity constraint. +If the number of pods has reach the `.spec.unit[].max`, all the nodes in unit will be marked unschedulable. + +#### Score + +Node score is `100 - (index of the unit)` + +#### PreBind + +Add annotations and labels to pods to ensure they can be scaled down in the order of the units. + +## Known limitations + +- Currently deletion costs only take effect on deployment workload. + +## Test plans + +1. Add detailed unit and integration tests for the plugin and controller. +2. Add basic e2e tests, to ensure all components are working together. + +## Graduation criteria + +This plugin will not be enabled only when users enable it in scheduler framework and create a resourcepolicy for pods. +So it is safe to be beta. + +* Beta +- [ ] Add node E2E tests. +- [ ] Provide beta-level documentation. + +## Feature enablement and rollback + +Enable resourcepolicy in MultiPointPlugin to enable this plugin, like this: + +```yaml +piVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +leaderElection: + leaderElect: false +profiles: +- schedulerName: default-scheduler + plugins: + multiPoint: + enabled: + - name: resourcepolicy +``` + + diff --git a/kep/594-resourcepolicy/kep.yaml b/kep/594-resourcepolicy/kep.yaml new file mode 100644 index 000000000..32b3a3f9a --- /dev/null +++ b/kep/594-resourcepolicy/kep.yaml @@ -0,0 +1,5 @@ +title: Resourcepolicy +kep-number: 594 +authors: + - "@KunWuLuan" + - "@fjding" diff --git a/pkg/config/samples/resourcepolicy/crd.yaml b/pkg/config/samples/resourcepolicy/crd.yaml new file mode 100644 index 000000000..00a48dd12 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/crd.yaml @@ -0,0 +1,98 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.1-0.20250610061224-4cb7e87f3b45 + name: resourcepolicies.scheduling.x-k8s.io +spec: + group: scheduling.x-k8s.io + names: + kind: ResourcePolicy + listKind: ResourcePolicyList + plural: resourcepolicies + shortNames: + - rsp + singular: resourcepolicy + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + matchLabelKeys: + items: + type: string + type: array + selector: + additionalProperties: + type: string + type: object + units: + items: + properties: + max: + format: int32 + type: integer + maxResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + nodeSelector: + additionalProperties: + type: string + type: object + podAnnotations: + additionalProperties: + type: string + type: object + podLabels: + additionalProperties: + type: string + type: object + type: object + nullable: true + type: array + x-kubernetes-list-type: atomic + type: object + status: + properties: + lastUpdateTime: + format: date-time + type: string + pods: + items: + format: int64 + type: integer + type: array + type: object + required: + - spec + type: object + served: true + storage: true \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/deployment.yaml b/pkg/config/samples/resourcepolicy/deployment.yaml new file mode 100644 index 000000000..1e6b74a59 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/deployment.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx-deployment +spec: + replicas: 1 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + schedulerName: custom-scheduler + containers: + - name: nginx-container + resources: + limits: + cpu: 50m + memory: 50Mi + image: nginx:latest + ports: + - containerPort: 80 \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/resourcepolicy.yaml b/pkg/config/samples/resourcepolicy/resourcepolicy.yaml new file mode 100644 index 000000000..5104e7030 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/resourcepolicy.yaml @@ -0,0 +1,12 @@ +apiVersion: scheduling.x-k8s.io/v1alpha1 +kind: ResourcePolicy +metadata: + name: resourcepolicy-sample + namespace: default +spec: + selector: + app: nginx + units: + - max: 1 + nodeSelector: + unit: first \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/scheduler.config b/pkg/config/samples/resourcepolicy/scheduler.config new file mode 100644 index 000000000..d684dc3f5 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/scheduler.config @@ -0,0 +1,12 @@ +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +leaderElection: + leaderElect: false +clientConnection: + kubeconfig: /home/kunwu1997/.kube/config +profiles: +- schedulerName: custom-scheduler + plugins: + multiPoint: + enabled: + - name: ResourcePolicy \ No newline at end of file diff --git a/pkg/resourcepolicy/resourcepolicy.go b/pkg/resourcepolicy/resourcepolicy.go new file mode 100644 index 000000000..00c845585 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy.go @@ -0,0 +1,478 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepolicy + +import ( + "context" + "fmt" + "math" + "strings" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/scheduler-plugins/pkg/util" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" + resourcehelper "k8s.io/component-helpers/resource" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const Name = "ResourcePolicy" + +type resourcePolicyPlugin struct { + cache *resourcePolicyCache + + handle framework.Handle + + client client.Client + + schedulingCtx map[keyStr]*schedulingContext +} + +var _ framework.PreFilterPlugin = &resourcePolicyPlugin{} +var _ framework.FilterPlugin = &resourcePolicyPlugin{} +var _ framework.ScorePlugin = &resourcePolicyPlugin{} +var _ framework.ReservePlugin = &resourcePolicyPlugin{} +var _ framework.PreBindPlugin = &resourcePolicyPlugin{} + +func (rspp *resourcePolicyPlugin) Name() string { + return Name +} + +func New(ctx context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + lh := klog.FromContext(ctx).WithValues("plugin", Name) + lh.V(2).Info("creating new resourcepolicy plugin") + + podLister := handle.SharedInformerFactory().Core().V1().Pods().Lister() + nodeLister := handle.SharedInformerFactory().Core().V1().Nodes().Lister() + nodeInfoSnapshot := handle.SnapshotSharedLister().NodeInfos() + rspCache := NewResourcePolicyCache( + nodeLister, + podLister, + nodeInfoSnapshot, + ) + + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + c, ccache, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme) + if err != nil { + return nil, err + } + defer func() { + ccache.Start(ctx) + ccache.WaitForCacheSync(ctx) + lh.V(2).Info("ResourcePolicyCache synced.") + }() + + rspInformer, err := ccache.GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ResourcePolicy")) + if err != nil { + return nil, err + } + rspInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + rsp, ok := obj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.AddOrUpdateResPolicy(rsp) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + rsp, ok := newObj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.AddOrUpdateResPolicy(rsp) + }, + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *v1alpha1.ResourcePolicy: + rsp := t + rspCache.DeleteResourcePolicy(rsp) + case cache.DeletedFinalStateUnknown: + rsp, ok := t.Obj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.DeleteResourcePolicy(rsp) + } + }, + }) + + handle.SharedInformerFactory().Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + switch t := obj.(type) { + case *v1.Node: + node := t + pods, err := handle.SharedInformerFactory().Core().V1().Pods().Informer().GetIndexer().ByIndex("spec.nodeName", node.Name) + if err != nil { + klog.Error(err, "get node pods failed") + return + } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() + + podKeys := []string{} + for _, i := range pods { + pod := i.(*v1.Pod) + rsp := GetManagedResourcePolicy(pod) + if rsp == "" { + continue + } + podKeys = append(podKeys, string(klog.KObj(pod).String())) + rspCache.AddOrUpdateBoundPod(pod) + } + klog.InfoS("add node event", "processedPod", strings.Join(podKeys, ",")) + } + }, + }) + + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{ + ManagedByResourcePolicyIndexKey: func(obj interface{}) ([]string, error) { + return []string{string(GetManagedResourcePolicy(obj.(*v1.Pod)))}, nil + }, + "spec.nodeName": func(obj interface{}) ([]string, error) { + p := obj.(*v1.Pod) + return []string{p.Spec.NodeName}, nil + }, + }) + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return GetManagedResourcePolicy(t) != "" && t.Spec.NodeName != "" + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return GetManagedResourcePolicy(pod) != "" && pod.Spec.NodeName != "" + } + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pd, ok := obj.(*v1.Pod) + if !ok { + return + } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() + rspCache.AddOrUpdateBoundPod(pd) + klog.Info("add event for scheduled pod", "pod", klog.KObj(pd)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pd, ok := newObj.(*v1.Pod) + if !ok { + return + } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() + rspCache.AddOrUpdateBoundPod(pd) + klog.Info("update event for scheduled pod", "pod", klog.KObj(pd)) + }, + DeleteFunc: func(obj interface{}) { + var pd *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pd = t + case cache.DeletedFinalStateUnknown: + var ok bool + pd, ok = t.Obj.(*v1.Pod) + if !ok { + return + } + } + rspCache.DeleteBoundPod(pd) + }, + }, + }) + + plg := &resourcePolicyPlugin{ + cache: rspCache, + client: c, + handle: handle, + schedulingCtx: map[keyStr]*schedulingContext{}, + } + return plg, nil +} + +func (rspp *resourcePolicyPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return []framework.ClusterEventWithHint{ + {}, + }, nil +} + +func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framework.CycleState, + pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + var matched *resourcePolicyInfo + var schedCtx *schedulingContext + var ok bool + logger := klog.FromContext(ctx).WithValues("pod", klog.KObj(pod)) + rspp.cache.processingLock.RLock() + podKey := GetKeyStr(pod.ObjectMeta) + if schedCtx, ok = rspp.schedulingCtx[podKey]; ok && schedCtx.matched != "" { + matched = rspp.cache.getResourcePolicyInfoByKey(schedCtx.matched, pod.Namespace) + if matched == nil || matched.rv != schedCtx.resourceVersion { + schedCtx.matched = "" + matched = nil + } + } else { + schedCtx = &schedulingContext{} + rspp.schedulingCtx[podKey] = schedCtx + } + + if matched == nil { + resourcePoliciesInNamespace := rspp.cache.rps[pod.Namespace] + if len(resourcePoliciesInNamespace) == 0 { + rspp.cache.processingLock.RUnlock() + logger.V(2).Info("no resourcePolicy matches pod") + return nil, framework.NewStatus(framework.Skip) + } + + for _, rspi := range resourcePoliciesInNamespace { + if rspi.podSelector.Matches(labels.Set(pod.Labels)) { + if matched != nil { + rspp.cache.processingLock.RUnlock() + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "multiple resourcePolicies match pod") + } + matched = rspi + } + } + } + rspp.cache.processingLock.RUnlock() + + if matched == nil { + logger.V(2).Info("no resourcePolicy matches pod") + return nil, framework.NewStatus(framework.Skip, "no resourcePolicy matches pod") + } else { + logger = logger.WithValues("resourcePolicy", matched.ks) + schedCtx.matched = matched.ks + schedCtx.resourceVersion = matched.rv + schedCtx.unitIdx = 0 + } + matched.processingLock.Lock() + for matched.processing { + matched.cond.Wait() + } + matched.processingLock.Unlock() + + valid, labelKeyValue := genLabelKeyValueForPod(matched.policy, pod) + if !valid { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "some labels not found in pod") + } + preFilterState := &ResourcePolicyPreFilterState{ + matchedInfo: matched, + podRes: framework.NewResource(resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{})), + labelKeyValue: labelKeyValue, + + currentCount: make([]int, len(matched.nodeSelectors)), + maxCount: make([]int, len(matched.nodeSelectors)), + maxConsumption: make([]*framework.Resource, len(matched.nodeSelectors)), + resConsumption: make([]*framework.Resource, len(matched.nodeSelectors)), + + nodeSelectos: make([]labels.Selector, len(matched.nodeSelectors)), + } + for idx, count := range matched.assumedPodCount { + preFilterState.currentCount[idx] = count[labelKeyValue] + } + for idx, consumption := range matched.assumedPodConsumption { + preFilterState.resConsumption[idx] = consumption[labelKeyValue] + } + for idx, max := range matched.maxPodResources { + if max == nil { + continue + } + preFilterState.maxConsumption[idx] = max.Clone() + } + copy(preFilterState.nodeSelectos, matched.nodeSelectors) + for idx, max := range matched.policy.Spec.Units { + if max.Max == nil { + preFilterState.maxCount[idx] = math.MaxInt32 + } else { + preFilterState.maxCount[idx] = int(*max.Max) + } + } + state.Write(ResourcePolicyPreFilterStateKey, preFilterState) + logger.V(2).Info("details of matched resource policy", "maxCount", preFilterState.maxCount, "currentCount", preFilterState.currentCount, + "maxConsumption", resourceListToStr(preFilterState.maxConsumption), "resConsumption", resourceListToStr(preFilterState.resConsumption), + "nodeSelectors", nodeSelectorsToStr(preFilterState.nodeSelectos)) + return nil, nil +} + +func nodeSelectorsToStr(selectors []labels.Selector) string { + res := []string{} + for _, selector := range selectors { + if selector == nil { + continue + } + res = append(res, selector.String()) + } + return strings.Join(res, "|") +} + +func resourceListToStr(list []*framework.Resource) string { + res := []string{} + for _, r := range list { + if r == nil { + res = append(res, "nil") + continue + } + res = append(res, fmt.Sprintf("cpu:%v,memory:%v,scalar:%+v", r.MilliCPU, r.Memory, r.ScalarResources)) + } + return strings.Join(res, "|") +} + +func (rspp *resourcePolicyPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +type unitNotAvaiInfo struct { + idx int + res string +} + +func findAvailableUnitForNode(nodeInfo *framework.NodeInfo, state *ResourcePolicyPreFilterState) (int, []unitNotAvaiInfo) { + found := -1 + notValidIdx := []unitNotAvaiInfo{} + for idx := range state.nodeSelectos { + if !state.nodeSelectos[idx].Matches(labels.Set(nodeInfo.Node().Labels)) { + continue + } + if state.currentCount[idx] >= state.maxCount[idx] { + notValidIdx = append(notValidIdx, unitNotAvaiInfo{ + idx: idx, + res: "pod", + }) + continue + } + if state.maxConsumption[idx] != nil && state.resConsumption[idx] != nil { + res := state.resConsumption[idx].Clone() + res.MilliCPU += state.podRes.MilliCPU + res.Memory += state.podRes.Memory + for k, v := range state.podRes.ScalarResources { + res.AddScalar(k, v) + } + if gt, res := largeThan(res, state.maxConsumption[idx]); gt { + notValidIdx = append(notValidIdx, unitNotAvaiInfo{ + idx: idx, + res: res, + }) + continue + } + } + found = idx + break + } + return found, notValidIdx +} + +func (rspp *resourcePolicyPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return framework.AsStatus(fmt.Errorf("cannot convert %T to ResourcePolicyPreFilterState", obj)) + } + if avai, info := findAvailableUnitForNode(nodeInfo, preFilterState); avai == -1 { + if len(info) == 0 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("not match any unit")) + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("unit not available: %v", info)) + } + return nil +} + +func (rspp *resourcePolicyPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return 0, nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return 0, framework.AsStatus(fmt.Errorf("cannot convert %T to ResourcePolicyPreFilterState", obj)) + } + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + if avai, info := findAvailableUnitForNode(nodeInfo, preFilterState); avai == -1 { + return 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("unit not available: %v", info)) + } else { + return int64(100 - avai), nil + } +} + +func (rspp *resourcePolicyPlugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (rspp *resourcePolicyPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + err = rspp.cache.Assume(state, pod, nodeInfo.Node()) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("assuming pod %q: %v", pod.Name, err)) + } + return nil +} + +func (rspp *resourcePolicyPlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return + } + rspp.cache.Forget(state, pod, nodeInfo.Node()) +} + +func (rspp *resourcePolicyPlugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return framework.AsStatus(fmt.Errorf("unable to convert state to ResourcePolicyPreFilterState")) + } + newPod := v1.Pod{} + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := rspp.client.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, &newPod) + if err != nil { + return err + } + if len(newPod.Annotations) == 0 { + newPod.Annotations = make(map[string]string) + } + newPod.Annotations[ManagedByResourcePolicyAnnoKey] = string(preFilterState.matchedInfo.ks) + return rspp.client.Update(ctx, &newPod) + }) + if err != nil { + return framework.AsStatus(fmt.Errorf("unable to get pod %v/%v", p.Namespace, p.Name)) + } + return nil +} diff --git a/pkg/resourcepolicy/resourcepolicy_cache.go b/pkg/resourcepolicy/resourcepolicy_cache.go new file mode 100644 index 000000000..f0c8b4264 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy_cache.go @@ -0,0 +1,310 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepolicy + +import ( + "fmt" + "sync" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/workqueue" + resourcehelper "k8s.io/component-helpers/resource" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type resourcePolicyCache struct { + processingLock sync.RWMutex + + // rps is map, keyed with namespace + // value is map, keyed with NamespaceName of ResourcePolicy + rps map[string]map[keyStr]*resourcePolicyInfo + // pd2Rps stores all pods that have been assumed + // pd2Rps is map, keyed with namespaceName of pod + // value is namespaceName of ResourcePolicy + pd2Rps map[keyStr]keyStr + assumedPd2Node map[keyStr]string + + wq workqueue.TypedRateLimitingInterface[types.NamespacedName] + + nl v1.NodeLister + pl v1.PodLister + ss framework.NodeInfoLister +} + +func NewResourcePolicyCache( + nl v1.NodeLister, + pl v1.PodLister, + ss framework.NodeInfoLister, +) *resourcePolicyCache { + cache := &resourcePolicyCache{ + rps: make(map[string]map[keyStr]*resourcePolicyInfo), + pd2Rps: make(map[keyStr]keyStr), + + wq: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: "resourcepolicy"}), + + nl: nl, + pl: pl, + ss: ss, + } + go cache.updateLoop() + return cache +} + +// get the lock outside +func (rspc *resourcePolicyCache) getResourcePolicyInfoByKey(key keyStr, namespace string) *resourcePolicyInfo { + rpsInNs := rspc.rps[namespace] + if len(rpsInNs) == 0 { + return nil + } + return rpsInNs[key] +} + +func (rspc *resourcePolicyCache) Assume(cycleState *framework.CycleState, pod *corev1.Pod, node *corev1.Node) error { + state, err := cycleState.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := state.(*ResourcePolicyPreFilterState) + if !ok { + return fmt.Errorf("unable to convert state to ResourcePolicyPreFilterState") + } + + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(pod.ObjectMeta) + if node, ok := rspc.assumedPd2Node[podKey]; ok { + return fmt.Errorf("PodAlreadyAssumed assumed node: %v", node) + } + rspc.pd2Rps[podKey] = preFilterState.matchedInfo.ks + + rspi := preFilterState.matchedInfo + for idx, sel := range rspi.nodeSelectors { + if !sel.Matches(labels.Set(node.Labels)) { + continue + } + rspi.addPodToBoundOrAssumedPods(rspi.assumedPods, idx, preFilterState.labelKeyValue, node.Name, podKey, preFilterState.podRes) + } + return nil +} + +func (rspc *resourcePolicyCache) Forget(cycleState *framework.CycleState, pod *corev1.Pod, node *corev1.Node) { + state, err := cycleState.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return + } + preFilterState, ok := state.(*ResourcePolicyPreFilterState) + if !ok { + return + } + + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(pod.ObjectMeta) + delete(rspc.assumedPd2Node, podKey) + + rspi := preFilterState.matchedInfo + for idx, sel := range rspi.nodeSelectors { + if !sel.Matches(labels.Set(node.Labels)) { + continue + } + rspi.removePodFromBoundOrAssumedPods(rspi.assumedPods, idx, preFilterState.labelKeyValue, node.Name, podKey) + } +} + +// need to obtain lock outside +func (rspc *resourcePolicyCache) AddOrUpdateBoundPod(p *corev1.Pod) { + podKey := GetKeyStr(p.ObjectMeta) + rspkey := GetManagedResourcePolicy(p) + assumedRspKey, ok := rspc.pd2Rps[podKey] + if ok && assumedRspKey != rspkey { + klog.ErrorS(fmt.Errorf("AssignedResourcePolicyNotMatch"), "bound pod is managed by another resourcepolicy", "assumed", assumedRspKey, "bound", rspkey) + delete(rspc.pd2Rps, podKey) + + rsps, ok := rspc.rps[p.Namespace] + if ok { + rspi, ok := rsps[assumedRspKey] + if ok { + valid, labelKeyValue := genLabelKeyValueForPod(rspi.policy, p) + if valid { + rspi.processingLock.Lock() + rspi.removePod(podKey, labelKeyValue) + rspi.processingLock.Unlock() + } + } + } + } + + boundRsp := rspc.getResourcePolicyInfoByKey(rspkey, p.Namespace) + if boundRsp == nil { + klog.ErrorS(fmt.Errorf("ResourcePolicyInfoNotFound"), "bound pod is managed by a resourcepolicy that is not found", "bound", rspkey) + return + } + + nodeName := p.Spec.NodeName + if nodeName == "" { + klog.ErrorS(fmt.Errorf("PodNotBound"), "pod is not bound", "pod", klog.KObj(p)) + return + } + + node, err := rspc.nl.Get(nodeName) + if err != nil { + klog.ErrorS(err, "failed to get node", "node", nodeName) + return + } + + valid, labelKeyValue := genLabelKeyValueForPod(boundRsp.policy, p) + if !valid { + return + } + boundRsp.processingLock.Lock() + defer boundRsp.processingLock.Unlock() + podRes := resourcehelper.PodRequests(p, resourcehelper.PodResourcesOptions{}) + for idx, sel := range boundRsp.nodeSelectors { + if !sel.Matches(labels.Set(node.Labels)) { + // TODO: remove pod from this count + // do not remove node from unit by update node values + continue + } + boundRsp.removePodFromBoundOrAssumedPods(boundRsp.assumedPods, idx, labelKeyValue, node.Name, podKey) + boundRsp.addPodToBoundOrAssumedPods(boundRsp.boundPods, idx, labelKeyValue, nodeName, podKey, framework.NewResource(podRes)) + } +} + +func (rspc *resourcePolicyCache) DeleteBoundPod(p *corev1.Pod) { + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(p.ObjectMeta) + rspkey := GetManagedResourcePolicy(p) + rsp := rspc.getResourcePolicyInfoByKey(rspkey, p.Namespace) + if rsp != nil { + valid, labelKeyValue := genLabelKeyValueForPod(rsp.policy, p) + if valid { + rsp.removePod(podKey, labelKeyValue) + } + } + + assumedRspKey := rspc.pd2Rps[podKey] + assumedRsp := rspc.getResourcePolicyInfoByKey(assumedRspKey, p.Namespace) + if assumedRsp != nil { + valid, labelKeyValue := genLabelKeyValueForPod(assumedRsp.policy, p) + if valid { + assumedRsp.removePod(podKey, labelKeyValue) + } + } + delete(rspc.pd2Rps, podKey) + delete(rspc.assumedPd2Node, podKey) +} + +func (rspc *resourcePolicyCache) DeleteResourcePolicy(rsp *v1alpha1.ResourcePolicy) { + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + ns := rsp.Namespace + rspKey := GetKeyStr(rsp.ObjectMeta) + + if rspInNs, ok := rspc.rps[ns]; ok { + if _, rspok := rspInNs[rspKey]; rspok { + delete(rspInNs, rspKey) + } + if len(rspInNs) == 0 { + delete(rspc.rps, ns) + } + } +} + +func (rspc *resourcePolicyCache) AddOrUpdateResPolicy(rsp *v1alpha1.ResourcePolicy) { + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + ns := rsp.Namespace + rspKey := GetKeyStr(rsp.ObjectMeta) + + if rspInNs, ok := rspc.rps[ns]; ok { + if rspinfo, rspok := rspInNs[rspKey]; rspok { + if rspinfo.rv == rsp.ResourceVersion { + return + } + + rspinfo.processingLock.Lock() + rspinfo.processing = true + + rspinfo.rv = rsp.ResourceVersion + rspinfo.policy = rsp + rspinfo.podSelector = labels.SelectorFromSet(rsp.Spec.Selector) + rspinfo.processingLock.Unlock() + + rspc.wq.AddRateLimited(types.NamespacedName{Namespace: rsp.Namespace, Name: rsp.Name}) + return + } + } else { + rspc.rps[ns] = make(map[keyStr]*resourcePolicyInfo) + } + + newRspInfo := newResourcePolicyInfo() + newRspInfo.processingLock.Lock() + newRspInfo.processing = true + rspc.rps[ns][rspKey] = newRspInfo + + newRspInfo.ks = rspKey + newRspInfo.rv = rsp.ResourceVersion + newRspInfo.policy = rsp + newRspInfo.podSelector = labels.SelectorFromSet(rsp.Spec.Selector) + newRspInfo.processingLock.Unlock() + + rspc.wq.AddRateLimited(types.NamespacedName{Namespace: rsp.Namespace, Name: rsp.Name}) +} + +func (rspc *resourcePolicyCache) updateLoop() { + for { + item, shutdown := rspc.wq.Get() + if shutdown { + return + } + + rspc.processingLock.RLock() + ns := item.Namespace + rspKey := item.Namespace + "/" + item.Name + + var ok bool + var rspInNs map[keyStr]*resourcePolicyInfo + var rspinfo *resourcePolicyInfo + if rspInNs, ok = rspc.rps[ns]; !ok { + rspc.wq.Done(item) + rspc.processingLock.RUnlock() + continue + } + if rspinfo, ok = rspInNs[keyStr(rspKey)]; !ok { + rspc.wq.Done(item) + rspc.processingLock.RUnlock() + continue + } + rspc.processingLock.RUnlock() + + rspinfo.complete(rspc.pl, rspc.nl, rspc.ss, rspc.pd2Rps) + } +} diff --git a/pkg/resourcepolicy/resourcepolicy_test.go b/pkg/resourcepolicy/resourcepolicy_test.go new file mode 100644 index 000000000..6c7acf843 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy_test.go @@ -0,0 +1,81 @@ +package resourcepolicy + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + framework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +func TestFindAvailableUnitForNode(t *testing.T) { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + "region": "us-east-1", + }, + }, + } + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + + state := &ResourcePolicyPreFilterState{ + nodeSelectos: []labels.Selector{ + labels.SelectorFromSet(labels.Set{"region": "us-east-1"}), + }, + currentCount: []int{0}, + maxCount: []int{1}, + resConsumption: []*framework.Resource{ + {MilliCPU: 500, Memory: 512}, + }, + maxConsumption: []*framework.Resource{ + {MilliCPU: 1000, Memory: 1024}, + }, + podRes: &framework.Resource{ + MilliCPU: 400, Memory: 256, + }, + } + + // Test case 1 + idx, notValid := findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, 0, idx) + assert.Empty(t, notValid) + + // Test case 2: No matching node selector + state.nodeSelectos[0] = labels.SelectorFromSet(labels.Set{"region": "us-west-1"}) + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Empty(t, notValid) + + // Test case 3: Max count reached + state.nodeSelectos[0] = labels.SelectorFromSet(labels.Set{"region": "us-east-1"}) + state.currentCount[0] = 1 + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "pod", notValid[0].res) + + // Test case 4: Resource consumption exceeds limit + state.currentCount[0] = 0 + state.resConsumption[0] = &framework.Resource{ + MilliCPU: 900, Memory: 900, + } + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "cpu", notValid[0].res) + + // Test case 5: memory consumption exceeds limit + state.currentCount[0] = 0 + state.resConsumption[0] = &framework.Resource{ + MilliCPU: 100, Memory: 900, + } + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "memory", notValid[0].res) +} diff --git a/pkg/resourcepolicy/resourcepolicyinfo.go b/pkg/resourcepolicy/resourcepolicyinfo.go new file mode 100644 index 000000000..210bfc036 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicyinfo.go @@ -0,0 +1,260 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepolicy + +import ( + "fmt" + "sync" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + listersv1 "k8s.io/client-go/listers/core/v1" + resourcehelper "k8s.io/component-helpers/resource" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type resourcePolicyInfo struct { + processingLock sync.Mutex + // when processing is ture, means cache of the rspinfo is waiting for reconcile + // do not schedule pod in this rsp. + processing bool + cond *sync.Cond + + ks keyStr + rv string + + policy *v1alpha1.ResourcePolicy + podSelector labels.Selector + nodeSelectors []labels.Selector + // + // [unit.index] { + // "node1": { + // "/": [ pod1.ns/name, ... ], + // "matchLabelKeys": [ pod2.ns/name, ... ], + // } + // } + assumedPods multiLevelPodSet + boundPods multiLevelPodSet + // contains assumed and bound pods + // [unit.index] { + // "/": 1 + // "matchLabelKeys-v1": 1 + // } + assumedPodCount []map[labelKeysValue]int + // + // key: pod.Namespace/pod.Name + podResourceDetails map[keyStr]*framework.Resource + maxPodResources []*framework.Resource + assumedPodConsumption []map[labelKeysValue]*framework.Resource +} + +func newResourcePolicyInfo() *resourcePolicyInfo { + rspinfo := &resourcePolicyInfo{ + assumedPods: make(multiLevelPodSet, 0), + boundPods: make(multiLevelPodSet, 0), + assumedPodCount: make([]map[labelKeysValue]int, 0), + podResourceDetails: make(map[keyStr]*framework.Resource), + maxPodResources: make([]*framework.Resource, 0), + assumedPodConsumption: make([]map[labelKeysValue]*framework.Resource, 0), + } + rspinfo.cond = sync.NewCond(&rspinfo.processingLock) + return rspinfo +} + +func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.NodeLister, ss framework.NodeInfoLister, + assumedPods map[keyStr]keyStr) { + if !rspi.processing { + return + } + if rspi.policy == nil { + klog.ErrorS(fmt.Errorf("ResourcePolicyInfoNotInited"), "resourcePolicyInfo not inited", "rspKey", rspi.ks) + return + } + + rspi.processingLock.Lock() + defer func() { + // resourcepolicy has been initialized, pods can be scheduled again + rspi.processing = false + rspi.processingLock.Unlock() + rspi.cond.Broadcast() + }() + + rspi.nodeSelectors = make([]labels.Selector, len(rspi.policy.Spec.Units)) + rspi.assumedPods = make(multiLevelPodSet, len(rspi.policy.Spec.Units)) + rspi.assumedPodConsumption = make([]map[labelKeysValue]*framework.Resource, len(rspi.policy.Spec.Units)) + rspi.maxPodResources = make([]*framework.Resource, len(rspi.policy.Spec.Units)) + rspi.boundPods = make(multiLevelPodSet, len(rspi.policy.Spec.Units)) + rspi.assumedPodCount = make([]map[labelKeysValue]int, len(rspi.policy.Spec.Units)) + for idx, unit := range rspi.policy.Spec.Units { + selector := labels.SelectorFromSet(unit.NodeSelector) + rspi.nodeSelectors[idx] = selector + if len(unit.MaxResources) > 0 { + rspi.maxPodResources[idx] = framework.NewResource(unit.MaxResources) + } + nodes, err := nl.List(selector) + if err != nil { + continue + } + for _, no := range nodes { + ni, err := ss.Get(no.Name) + if err != nil { + continue + } + for _, po := range ni.Pods { + if GetManagedResourcePolicy(po.Pod) != rspi.ks && + assumedPods[GetKeyStr(po.Pod.ObjectMeta)] != rspi.ks { + continue + } + ok, labelKeyValue := genLabelKeyValueForPod(rspi.policy, po.Pod) + if !ok { + continue + } + + res := resourcehelper.PodRequests(po.Pod, resourcehelper.PodResourcesOptions{}) + rspi.addPodToBoundOrAssumedPods(rspi.boundPods, idx, labelKeyValue, no.Name, GetKeyStr(po.Pod.ObjectMeta), framework.NewResource(res)) + } + } + } +} + +// lock should be get outside +// TODO: when pod resource is chanaged, assumedPodConsumption maybe wrong +func (r *resourcePolicyInfo) addPodToBoundOrAssumedPods(ps multiLevelPodSet, index int, labelValues labelKeysValue, nodename string, podKey keyStr, res *framework.Resource) bool { + if len(ps[index]) == 0 { + ps[index] = make(map[string]map[labelKeysValue]sets.Set[keyStr]) + } + podsetByValue := ps[index][nodename] + if podsetByValue == nil { + podsetByValue = make(map[labelKeysValue]sets.Set[keyStr]) + } + podset := podsetByValue[labelValues] + if podset == nil { + podset = sets.New[keyStr]() + } + if podset.Has(podKey) { + return false + } + podset.Insert(podKey) + podsetByValue[labelValues] = podset + ps[index][nodename] = podsetByValue + if curRes, ok := r.podResourceDetails[podKey]; !ok || !equalResource(res, curRes) { + r.podResourceDetails[podKey] = res + } + r.updateAssumedPodCount(index, labelValues, 1, res) + return true +} + +// TODO: handle the case that pod's labelKeyValue was changed +func (r *resourcePolicyInfo) removePod(podKeyStr keyStr, labelKeyValue labelKeysValue) { + for _, podSetByNode := range r.boundPods { + for _, podSetByLabelValues := range podSetByNode { + for _, podSet := range podSetByLabelValues { + podSet.Delete(podKeyStr) + } + } + } + for _, podSetByNode := range r.assumedPods { + for _, podSetByLabelValues := range podSetByNode { + for _, podSet := range podSetByLabelValues { + podSet.Delete(podKeyStr) + } + } + } + + podRes := r.podResourceDetails[podKeyStr] + if podRes != nil { + for idx := range r.assumedPodConsumption { + r.updateAssumedPodCount(idx, labelKeyValue, -1, podRes) + } + delete(r.podResourceDetails, podKeyStr) + } +} + +// lock should be get outside +func (r *resourcePolicyInfo) removePodFromBoundOrAssumedPods(ps multiLevelPodSet, index int, labelValues labelKeysValue, nodename string, podKey keyStr) (bool, error) { + podsetByValue := ps[index][nodename] + if podsetByValue == nil { + return false, nil // fmt.Errorf("labelValues %v not found", labelValues) + } + podset := podsetByValue[labelValues] + if podset == nil { + return false, nil // fmt.Errorf("node %v not found", nodename) + } + if !podset.Has(podKey) { + return false, fmt.Errorf("pod %v not found", podKey) + } + podset.Delete(podKey) + if podset.Len() == 0 { + delete(podsetByValue, labelValues) + } else { + podsetByValue[labelValues] = podset + } + ps[index][nodename] = podsetByValue + + res := framework.NewResource(nil) + if detail, ok := r.podResourceDetails[podKey]; ok { + res = detail + } + if err := r.updateAssumedPodCount(index, labelValues, -1, res); err != nil { + return false, err + } + return true, nil +} + +func (r *resourcePolicyInfo) updateAssumedPodCount(index int, v labelKeysValue, count int, totalRes *framework.Resource) error { + if count > 0 { + if r.assumedPodCount[index] == nil { + r.assumedPodCount[index] = map[labelKeysValue]int{v: count} + } else { + r.assumedPodCount[index][v] += count + } + r.updateAssumedPodResource(index, v, true, totalRes) + } else { + if r.assumedPodCount[index] == nil { + return fmt.Errorf("pod not found in assumedPodCount, this should never happen") + } else { + r.assumedPodCount[index][v] += count + if r.assumedPodCount[index][v] == 0 { + delete(r.assumedPodCount[index], v) + } + } + r.updateAssumedPodResource(index, v, false, totalRes) + } + return nil +} + +func (r *resourcePolicyInfo) updateAssumedPodResource(index int, v labelKeysValue, add bool, totalRes *framework.Resource) { + if add { + if r.assumedPodConsumption[index] == nil { + r.assumedPodConsumption[index] = map[labelKeysValue]*framework.Resource{v: totalRes.Clone()} + } else { + addResources(r.assumedPodConsumption[index][v], totalRes) + } + } else { + if r.assumedPodConsumption[index] == nil { + return + } else { + if r.assumedPodConsumption[index][v] == nil { + return + } + subResources(r.assumedPodConsumption[index][v], totalRes) + } + } +} diff --git a/pkg/resourcepolicy/schedulingcontext.go b/pkg/resourcepolicy/schedulingcontext.go new file mode 100644 index 000000000..ce40d004f --- /dev/null +++ b/pkg/resourcepolicy/schedulingcontext.go @@ -0,0 +1,27 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepolicy + +// schedulingContext is created when the pod scheduling started. and the context is deleted +// when the pod is deleted or bound. +type schedulingContext struct { + matched keyStr + // when begin to schedule, unitIdx is the index of last scheduling attempt + // in the scheduling cycle, unitIdx is the index of current scheduling attempt + unitIdx int + resourceVersion string +} diff --git a/pkg/resourcepolicy/types.go b/pkg/resourcepolicy/types.go new file mode 100644 index 000000000..30c953c44 --- /dev/null +++ b/pkg/resourcepolicy/types.go @@ -0,0 +1,205 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepolicy + +import ( + "strings" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type keyStr string +type labelKeysValue string +type multiLevelPodSet []map[string]map[labelKeysValue]sets.Set[keyStr] + +func genLabelKeysValue(strs []string) labelKeysValue { + return labelKeysValue(strings.Join(strs, "/")) +} + +// ret: valid, labelValue +func genLabelKeyValueForPod(rp *v1alpha1.ResourcePolicy, pod *v1.Pod) (bool, labelKeysValue) { + if len(rp.Spec.MatchLabelKeys) == 0 { + return true, "/" + } + value := "" + for _, key := range rp.Spec.MatchLabelKeys { + if v, ok := pod.Labels[key]; !ok { + return false, "" + } else { + value += v + } + } + return true, labelKeysValue(value) +} + +func GetKeyStr(obj metav1.ObjectMeta) keyStr { + return keyStr(obj.Namespace + "/" + obj.Name) +} + +const ManagedByResourcePolicyAnnoKey = "scheduling.x-k8s.io/managed-by-resourcepolicy" +const ManagedByResourcePolicyIndexKey = "metadata.annotations[scheduling.x-k8s.io/managed-by-resourcepolicy]" +const ResourcePolicyPreFilterStateKey = "scheduling.x-k8s.io/resourcepolicy-prefilter-state" + +type ResourcePolicyPreFilterState struct { + matchedInfo *resourcePolicyInfo + + maxCount []int + currentCount []int + resConsumption []*framework.Resource + maxConsumption []*framework.Resource + + labelKeyValue labelKeysValue + podRes *framework.Resource + + nodeSelectos []labels.Selector +} + +func (r *ResourcePolicyPreFilterState) Clone() framework.StateData { + ret := &ResourcePolicyPreFilterState{ + matchedInfo: r.matchedInfo, + + maxCount: SliceCopyInt(r.maxCount), + currentCount: SliceCopyInt(r.currentCount), + resConsumption: SliceCopyRes(r.resConsumption), + maxConsumption: SliceCopyRes(r.maxConsumption), + + labelKeyValue: r.labelKeyValue, + podRes: r.podRes, + } + return ret +} + +func GetManagedResourcePolicy(p *v1.Pod) keyStr { + return keyStr(p.Annotations[ManagedByResourcePolicyAnnoKey]) +} + +func equalResource(res1, res2 *framework.Resource) bool { + if res1 == nil && res2 == nil { + return true + } + if res1 == nil && res2 != nil || res1 != nil && res2 == nil { + return false + } + if res1.MilliCPU != res2.MilliCPU || + res1.Memory != res2.Memory || + res1.EphemeralStorage != res2.EphemeralStorage || + res1.AllowedPodNumber != res2.AllowedPodNumber { + return false + } + if len(res1.ScalarResources) != len(res2.ScalarResources) { + return false + } + for k, v := range res1.ScalarResources { + if v2, ok := res2.ScalarResources[k]; !ok || v != v2 { + return false + } + } + return true +} + +func addResources(res1, res2 *framework.Resource) { + if res1 == nil || res2 == nil { + return + } + res1.MilliCPU += res2.MilliCPU + res1.Memory += res2.Memory + res1.EphemeralStorage += res2.EphemeralStorage + res1.AllowedPodNumber += res2.AllowedPodNumber + for k, v := range res2.ScalarResources { + if _, ok := res1.ScalarResources[k]; !ok { + res1.ScalarResources[k] = v + } else { + res1.ScalarResources[k] += v + } + } +} + +func subResources(res1, res2 *framework.Resource) { + if res1 == nil || res2 == nil { + return + } + res1.MilliCPU -= res2.MilliCPU + if res1.MilliCPU < 0 { + res1.MilliCPU = 0 + } + res1.Memory -= res2.Memory + if res1.Memory < 0 { + res1.Memory = 0 + } + res1.EphemeralStorage -= res2.EphemeralStorage + if res1.EphemeralStorage < 0 { + res1.EphemeralStorage = 0 + } + res1.AllowedPodNumber -= res2.AllowedPodNumber + if res1.AllowedPodNumber < 0 { + res1.AllowedPodNumber = 0 + } + for k, v := range res2.ScalarResources { + if _, ok := res1.ScalarResources[k]; ok { + res1.ScalarResources[k] -= v + if res1.ScalarResources[k] < 0 { + res1.ScalarResources[k] = 0 + } + } + } +} + +func largeThan(res1, res2 *framework.Resource) (bool, string) { + if res1 == nil && res2 == nil { + return false, "" + } + if res1 == nil { + return false, "" + } + if res2 == nil { + return true, "" + } + if res1.MilliCPU > 0 && res1.MilliCPU > res2.MilliCPU { + return true, "cpu" + } + if res1.Memory > 0 && res1.Memory > res2.Memory { + return true, "memory" + } + if res1.EphemeralStorage > 0 && res1.EphemeralStorage > res2.EphemeralStorage { + return true, "ephemeral-storage" + } + for k, v := range res1.ScalarResources { + if v2, ok := res2.ScalarResources[k]; !ok || ok && v > 0 && v > v2 { + return true, string(k) + } + } + return false, "" +} + +func SliceCopyInt(src []int) []int { + dst := make([]int, len(src)) + copy(dst, src) + return dst +} + +func SliceCopyRes(src []*framework.Resource) []*framework.Resource { + dst := make([]*framework.Resource, len(src)) + for i := range src { + dst[i] = src[i].Clone() + } + return dst +}