From ac0f4f732753aca9b216867092c686ac541f1a38 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Sun, 11 May 2025 11:35:40 +0800 Subject: [PATCH 1/4] resourcepolicy plugin implementation --- go.mod | 46 +-- go.sum | 57 ++-- pkg/resourcepolicy/resourcepolicy.go | 344 +++++++++++++++++++++ pkg/resourcepolicy/resourcepolicy_cache.go | 293 ++++++++++++++++++ pkg/resourcepolicy/resourcepolicyinfo.go | 239 ++++++++++++++ pkg/resourcepolicy/schedulingcontext.go | 11 + pkg/resourcepolicy/types.go | 180 +++++++++++ 7 files changed, 1123 insertions(+), 47 deletions(-) create mode 100644 pkg/resourcepolicy/resourcepolicy.go create mode 100644 pkg/resourcepolicy/resourcepolicy_cache.go create mode 100644 pkg/resourcepolicy/resourcepolicyinfo.go create mode 100644 pkg/resourcepolicy/schedulingcontext.go create mode 100644 pkg/resourcepolicy/types.go diff --git a/go.mod b/go.mod index b7aac3395..ccae6a5b3 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module sigs.k8s.io/scheduler-plugins -go 1.23.0 +go 1.24.2 require ( + github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc github.com/containers/common v0.46.0 github.com/diktyo-io/appgroup-api v1.0.1-alpha github.com/diktyo-io/networktopology-api v1.0.1-alpha @@ -16,21 +17,21 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 gonum.org/v1/gonum v0.12.0 - k8s.io/api v0.32.5 - k8s.io/apimachinery v0.32.5 + k8s.io/api v0.33.0 + k8s.io/apimachinery v0.33.0 k8s.io/apiserver v0.32.5 k8s.io/client-go v0.32.5 - k8s.io/code-generator v0.32.5 + k8s.io/code-generator v0.33.0 k8s.io/component-base v0.32.5 k8s.io/component-helpers v0.32.5 k8s.io/klog/v2 v2.130.1 - k8s.io/kube-scheduler v0.32.5 - k8s.io/kubernetes v1.32.5 + k8s.io/kube-scheduler v0.31.8 + k8s.io/kubernetes v1.31.8 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 - sigs.k8s.io/controller-runtime v0.20.4 + sigs.k8s.io/controller-runtime v0.19.2 sigs.k8s.io/logtools v0.9.0 sigs.k8s.io/security-profiles-operator v0.4.0 - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 + sigs.k8s.io/structured-merge-diff/v4 v4.6.0 sigs.k8s.io/yaml v1.4.0 ) @@ -63,9 +64,9 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/btree v1.1.3 // indirect + github.com/google/btree v1.0.1 // indirect github.com/google/cel-go v0.22.0 // indirect - github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gnostic-models v0.6.9 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -107,39 +108,40 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.31.0 // indirect + golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.33.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.27.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/term v0.30.0 // indirect + golang.org/x/text v0.23.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.26.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240826202546-f6391c0de4c7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 // indirect google.golang.org/grpc v1.65.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.32.1 // indirect + k8s.io/apiextensions-apiserver v0.31.8 // indirect k8s.io/cloud-provider v0.32.5 // indirect k8s.io/controller-manager v0.32.5 // indirect - k8s.io/csi-translation-lib v0.32.5 // indirect - k8s.io/dynamic-resource-allocation v0.32.5 // indirect - k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 // indirect + k8s.io/csi-translation-lib v0.31.8 // indirect + k8s.io/dynamic-resource-allocation v0.31.8 // indirect + k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 // indirect k8s.io/kms v0.32.5 // indirect - k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect + k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect k8s.io/kubelet v0.32.5 // indirect k8s.io/metrics v0.32.5 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/release-utils v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 32857b94e..d31151e0e 100644 --- a/go.sum +++ b/go.sum @@ -1342,6 +1342,8 @@ github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc h1:tQ0XOIup39emiznGpxgV+PQAeqKvr0DeLfZf9ReqQfc= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc/go.mod h1:6G5vMUV/xm+IEwNrNaxvZZr6WiqAHq+o+YsU0TiwXO0= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= @@ -1906,16 +1908,16 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/gomarkdown/markdown v0.0.0-20210514010506-3b9f47219fe7/go.mod h1:aii0r/K0ZnHv7G0KF7xy1v0A7s2Ljrb5byB7MO5p6TU= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= -github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= -github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cadvisor v0.51.0/go.mod h1:czGE/c/P/i0QFpVNKTFrIEzord9Y10YfpwuaSWXELc0= github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= +github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -2307,9 +2309,8 @@ github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3Hig github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/ginkgo/v2 v2.20.1/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= -github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -2341,9 +2342,8 @@ github.com/onsi/gomega v1.33.0/go.mod h1:+925n5YtiFsLzzafLUHzVMBpvvRAzrydIBiSIxj github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= -github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= -github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -2848,8 +2848,8 @@ golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5D golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -3029,8 +3029,8 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -3103,8 +3103,8 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -3266,8 +3266,8 @@ golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3296,8 +3296,8 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= +golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -3323,8 +3323,8 @@ golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -3894,8 +3894,9 @@ google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -3987,8 +3988,9 @@ k8s.io/endpointslice v0.32.5/go.mod h1:jDpGlovH8wjHNibzlMon738wvBIZxw7gS6ngyjvbY k8s.io/externaljwt v0.32.5/go.mod h1:P9TZ/u+o3CG//KNc/2HJmKgnuvawWS75IosS9dlGlxI= k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo/v2 v2.0.0-20240826214909-a7b603a56eb7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= -k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 h1:si3PfKm8dDYxgfbeA6orqrtLkvvIeH8UqffFJDl0bz4= k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= +k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 h1:2OX19X59HxDprNCVrWi6jb7LW1PoqTlYqEq5H2oetog= +k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= @@ -4003,8 +4005,9 @@ k8s.io/kms v0.32.5/go.mod h1:Bk2evz/Yvk0oVrvm4MvZbgq8BD34Ksxs2SRHn4/UiOM= k8s.io/kube-aggregator v0.32.5/go.mod h1:YQfS6iOH6WsWDFyqbNI/J7M5Vf0cWqjaKLH9zGgYCuI= k8s.io/kube-controller-manager v0.32.5/go.mod h1:lXUDCoumjKXosoHGf9F1q8Qw2TPzteNVXqcro2NV5Is= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= -k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= k8s.io/kube-proxy v0.32.5/go.mod h1:1FhoUGg/xu7FcxF7MWLpFzHP5si0gaRiF1SH8fg+L8E= k8s.io/kube-scheduler v0.32.5 h1:sPlell9o0IAD12PanESrWTUcZeUaz9lQMBzNZzZ66/8= k8s.io/kube-scheduler v0.32.5/go.mod h1:jTl+IZEujDywfLvcyHC6tOVQVz8vRct/vtoqikqsBms= @@ -4098,8 +4101,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcp sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.9.2/go.mod h1:TxzMCHyEUpaeuOiZx/bIdc2T81vfs/aKdvJt9wuu0zk= sigs.k8s.io/controller-runtime v0.10.3/go.mod h1:CQp8eyUQZ/Q7PJvnIrB6/hgfTC1kBkGylwsLgOQi1WY= -sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= -sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/controller-runtime v0.19.2 h1:3sPrF58XQEPzbE8T81TN6selQIMGbtYwuaJ6eDssDF8= +sigs.k8s.io/controller-runtime v0.19.2/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/controller-tools v0.2.4/go.mod h1:m/ztfQNocGYBgTTCmFdnK94uVvgxeZeE3LtJvd/jIzA= sigs.k8s.io/controller-tools v0.7.0/go.mod h1:bpBAo0VcSDDLuWt47evLhMLPxRPxMDInTEH/YbdeMK0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= @@ -4113,6 +4116,9 @@ sigs.k8s.io/kustomize/kyaml v0.18.1/go.mod h1:C3L2BFVU1jgcddNBE1TxuVLgS46TjObMwW sigs.k8s.io/logtools v0.9.0 h1:IMP/HTDkfM6rg6os/tcEjmQeIHyOyu/enduM/cOPGNQ= sigs.k8s.io/logtools v0.9.0/go.mod h1:74Z5BP7ehrMHi/Q31W1gSf8YgwT/4GPjVH5xPSPeZA0= sigs.k8s.io/mdtoc v1.1.0/go.mod h1:QZLVEdHH2iNIR4uHAZyvFRtjloHgVItk8lo/mzCtq3w= +sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/release-utils v0.2.0/go.mod h1:9O5livl2h3Q56jUkoZ7UnV22XVRB6MuD4l/51C2vAPg= sigs.k8s.io/release-utils v0.3.0 h1:cyNeXvm+2lPn67f4MWmq9xapZDAI5hekpT7iQPRxta4= sigs.k8s.io/release-utils v0.3.0/go.mod h1:J9xpziRNRI4mAeMZxPRryDodQMoMudMu6yC1aViFHU4= @@ -4120,8 +4126,9 @@ sigs.k8s.io/security-profiles-operator v0.4.0 h1:TwSTQqHYngd9EZFskV3M+sTnyj6aExA sigs.k8s.io/security-profiles-operator v0.4.0/go.mod h1:aqtxq1T5+UWQpFEsfGCiUnY4p+s2KZoqQMETkBDlrrc= sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/pkg/resourcepolicy/resourcepolicy.go b/pkg/resourcepolicy/resourcepolicy.go new file mode 100644 index 000000000..2016ad9ec --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy.go @@ -0,0 +1,344 @@ +package resourcepolicy + +import ( + "context" + "fmt" + "math" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/scheduler-plugins/pkg/util" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const Name = "ResourcePolicy" + +type resourcePolicyPlugin struct { + cache *resourcePolicyCache + + handle framework.Handle + + client client.Client + + schedulingCtx map[keyStr]*schedulingContext +} + +var _ framework.PreFilterPlugin = &resourcePolicyPlugin{} +var _ framework.FilterPlugin = &resourcePolicyPlugin{} +var _ framework.ScorePlugin = &resourcePolicyPlugin{} +var _ framework.ReservePlugin = &resourcePolicyPlugin{} +var _ framework.PreBindPlugin = &resourcePolicyPlugin{} + +func (rspp *resourcePolicyPlugin) Name() string { + return Name +} + +func New(ctx context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + lh := klog.FromContext(ctx).WithValues("plugin", Name) + lh.V(5).Info("creating new coscheduling plugin") + + podLister := handle.SharedInformerFactory().Core().V1().Pods().Lister() + nodeLister := handle.SharedInformerFactory().Core().V1().Nodes().Lister() + nodeInfoSnapshot := handle.SnapshotSharedLister().NodeInfos() + rspCache := NewResourcePolicyCache( + nodeLister, + podLister, + nodeInfoSnapshot, + ) + + scheme := runtime.NewScheme() + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + c, ccache, err := util.NewClientWithCachedReader(ctx, handle.KubeConfig(), scheme) + if err != nil { + return nil, err + } + + rspInformer, _ := ccache.GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ResourcePolicy")) + rspInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + rsp, ok := obj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.AddOrUpdateResPolicy(rsp) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + rsp, ok := newObj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.AddOrUpdateResPolicy(rsp) + }, + DeleteFunc: func(obj interface{}) { + switch t := obj.(type) { + case *v1alpha1.ResourcePolicy: + rsp := t + rspCache.DeleteResourcePolicy(rsp) + case cache.DeletedFinalStateUnknown: + rsp, ok := t.Obj.(*v1alpha1.ResourcePolicy) + if !ok { + return + } + rspCache.DeleteResourcePolicy(rsp) + } + }, + }) + + handle.SharedInformerFactory().Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) + + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{ + ManagedByResourcePolicyIndexKey: func(obj interface{}) ([]string, error) { + return []string{string(GetManagedResourcePolicy(obj.(*v1.Pod)))}, nil + }, + }) + handle.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return GetManagedResourcePolicy(t) != "" && t.Spec.NodeName != "" + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return GetManagedResourcePolicy(pod) != "" && pod.Spec.NodeName != "" + } + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pd, ok := obj.(*v1.Pod) + if !ok { + return + } + rspCache.AddOrUpdateBoundPod(pd) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pd, ok := newObj.(*v1.Pod) + if !ok { + return + } + rspCache.AddOrUpdateBoundPod(pd) + }, + DeleteFunc: func(obj interface{}) { + var pd *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pd = t + case cache.DeletedFinalStateUnknown: + var ok bool + pd, ok = t.Obj.(*v1.Pod) + if !ok { + return + } + } + rspCache.DeleteBoundPod(pd) + }, + }, + }) + + plg := &resourcePolicyPlugin{ + cache: &resourcePolicyCache{ + pd2Rps: make(map[keyStr]keyStr), + rps: make(map[string]map[keyStr]*resourcePolicyInfo), + }, + client: c, + handle: handle, + } + return plg, nil +} + +func (rspp *resourcePolicyPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return []framework.ClusterEventWithHint{ + {}, + }, nil +} + +func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framework.CycleState, + pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + var matched *resourcePolicyInfo + var schedCtx *schedulingContext + var ok bool + rspp.cache.processingLock.RLock() + podKey := GetKeyStr(pod.ObjectMeta) + if schedCtx, ok = rspp.schedulingCtx[podKey]; ok && schedCtx.matched != "" { + matched = rspp.cache.getResourcePolicyInfoByKey(schedCtx.matched, pod.Namespace) + if matched == nil || matched.rv != schedCtx.resourceVersion { + schedCtx.matched = "" + matched = nil + } + } else { + rspp.schedulingCtx[podKey] = &schedulingContext{} + } + + if matched == nil { + resourcePoliciesInNamespace := rspp.cache.rps[pod.Namespace] + if len(resourcePoliciesInNamespace) == 0 { + rspp.cache.processingLock.RUnlock() + return nil, framework.NewStatus(framework.Skip) + } + + for _, rspi := range resourcePoliciesInNamespace { + if rspi.podSelector.Matches(labels.Set(pod.Labels)) { + if matched != nil { + rspp.cache.processingLock.RUnlock() + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "multiple resourcePolicies match pod") + } + matched = rspi + } + } + } + rspp.cache.processingLock.RUnlock() + + if matched == nil { + return nil, framework.NewStatus(framework.Skip, "no resourcePolicy matches pod") + } else { + schedCtx.matched = matched.ks + schedCtx.resourceVersion = matched.rv + schedCtx.unitIdx = 0 + } + + valid, labelKeyValue := genLabelKeyValueForPod(matched.policy, pod) + if !valid { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "some labels not found in pod") + } + preFilterState := &ResourcePolicyPreFilterState{ + matchedInfo: matched, + podRes: framework.NewResource(resource.PodRequests(pod, resource.PodResourcesOptions{})), + labelKeyValue: labelKeyValue, + + currentCount: make([]int, len(matched.nodeSelectors)), + maxCount: make([]int, len(matched.nodeSelectors)), + maxConsumption: make([]*framework.Resource, len(matched.nodeSelectors)), + resConsumption: make([]*framework.Resource, len(matched.nodeSelectors)), + + nodeSelectos: make([]labels.Selector, len(matched.nodeSelectors)), + } + for idx, count := range matched.assumedPodCount { + preFilterState.currentCount[idx] = count[labelKeyValue] + } + for idx, consumption := range matched.assumedPodConsumption { + preFilterState.resConsumption[idx] = consumption[labelKeyValue] + } + for idx, max := range matched.maxPodResources { + preFilterState.maxConsumption[idx] = max.Clone() + } + copy(preFilterState.nodeSelectos, matched.nodeSelectors) + for idx, max := range matched.policy.Spec.Units { + if max.Max == nil { + preFilterState.maxCount[idx] = math.MaxInt32 + } else { + preFilterState.maxCount[idx] = int(*max.Max) + } + } + state.Write(ResourcePolicyPreFilterStateKey, preFilterState) + return nil, nil +} + +func (rspp *resourcePolicyPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +type unitNotAvaiInfo struct { + idx int + res string +} + +func findAvailableUnitForNode(nodeInfo *framework.NodeInfo, state *ResourcePolicyPreFilterState) (int, []unitNotAvaiInfo) { + found := -1 + notValidIdx := []unitNotAvaiInfo{} + for idx := range state.nodeSelectos { + if !state.nodeSelectos[idx].Matches(labels.Set(nodeInfo.Node().Labels)) { + continue + } + if state.currentCount[idx] >= state.maxCount[idx] { + notValidIdx = append(notValidIdx, unitNotAvaiInfo{ + idx: idx, + res: "pod", + }) + continue + } + if lt, res := lessThan(state.resConsumption[idx], state.maxConsumption[idx]); lt { + notValidIdx = append(notValidIdx, unitNotAvaiInfo{ + idx: idx, + res: res, + }) + continue + } + found = idx + break + } + return found, notValidIdx +} + +func (rspp *resourcePolicyPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return framework.AsStatus(fmt.Errorf("cannot convert %T to ResourcePolicyPreFilterState", obj)) + } + if avai, info := findAvailableUnitForNode(nodeInfo, preFilterState); avai == -1 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("unit not available: %v", info)) + } + return nil +} + +func (rspp *resourcePolicyPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return 0, nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return 0, framework.AsStatus(fmt.Errorf("cannot convert %T to ResourcePolicyPreFilterState", obj)) + } + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + if avai, info := findAvailableUnitForNode(nodeInfo, preFilterState); avai == -1 { + return 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("unit not available: %v", info)) + } else { + return int64(100 - avai), nil + } +} + +func (rspp *resourcePolicyPlugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (rspp *resourcePolicyPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + err = rspp.cache.Assume(state, pod, nodeInfo.Node()) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("assuming pod %q: %v", pod.Name, err)) + } + return nil +} + +func (rspp *resourcePolicyPlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + nodeInfo, err := rspp.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return + } + rspp.cache.Forget(state, pod, nodeInfo.Node()) +} + +func (rspp *resourcePolicyPlugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + return nil +} diff --git a/pkg/resourcepolicy/resourcepolicy_cache.go b/pkg/resourcepolicy/resourcepolicy_cache.go new file mode 100644 index 000000000..f8c5c4889 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy_cache.go @@ -0,0 +1,293 @@ +package resourcepolicy + +import ( + "fmt" + "sync" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type resourcePolicyCache struct { + processingLock sync.RWMutex + + // rps is map, keyed with namespace + // value is map, keyed with NamespaceName of ResourcePolicy + rps map[string]map[keyStr]*resourcePolicyInfo + // pd2Rps stores all pods that have been assumed + // pd2Rps is map, keyed with namespaceName of pod + // value is namespaceName of ResourcePolicy + pd2Rps map[keyStr]keyStr + assumedPd2Node map[keyStr]string + + wq workqueue.TypedRateLimitingInterface[types.NamespacedName] + + nl v1.NodeLister + pl v1.PodLister + ss framework.NodeInfoLister +} + +func NewResourcePolicyCache( + nl v1.NodeLister, + pl v1.PodLister, + ss framework.NodeInfoLister, +) *resourcePolicyCache { + return &resourcePolicyCache{ + rps: make(map[string]map[keyStr]*resourcePolicyInfo), + pd2Rps: make(map[keyStr]keyStr), + + wq: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: "resourcepolicy"}), + + nl: nl, + pl: pl, + ss: ss, + } +} + +// get the lock outside +func (rspc *resourcePolicyCache) getResourcePolicyInfoByKey(key keyStr, namespace string) *resourcePolicyInfo { + rpsInNs := rspc.rps[namespace] + if len(rpsInNs) == 0 { + return nil + } + return rpsInNs[key] +} + +func (rspc *resourcePolicyCache) Assume(cycleState *framework.CycleState, pod *corev1.Pod, node *corev1.Node) error { + state, err := cycleState.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := state.(*ResourcePolicyPreFilterState) + if !ok { + return fmt.Errorf("unable to convert state to ResourcePolicyPreFilterState") + } + + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(pod.ObjectMeta) + if node, ok := rspc.assumedPd2Node[podKey]; ok { + return fmt.Errorf("PodAlreadyAssumed assumed node: %v", node) + } + + rspi := preFilterState.matchedInfo + for idx, sel := range rspi.nodeSelectors { + if !sel.Matches(labels.Set(node.Labels)) { + continue + } + rspi.addPodToBoundOrAssumedPods(rspi.assumedPods, idx, preFilterState.labelKeyValue, node.Name, podKey, preFilterState.podRes) + } + return nil +} + +func (rspc *resourcePolicyCache) Forget(cycleState *framework.CycleState, pod *corev1.Pod, node *corev1.Node) { + state, err := cycleState.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return + } + preFilterState, ok := state.(*ResourcePolicyPreFilterState) + if !ok { + return + } + + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(pod.ObjectMeta) + delete(rspc.assumedPd2Node, podKey) + + rspi := preFilterState.matchedInfo + for idx, sel := range rspi.nodeSelectors { + if !sel.Matches(labels.Set(node.Labels)) { + continue + } + rspi.removePodFromBoundOrAssumedPods(rspi.assumedPods, idx, preFilterState.labelKeyValue, node.Name, podKey) + } +} + +func (rspc *resourcePolicyCache) AddOrUpdateBoundPod(p *corev1.Pod) { + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(p.ObjectMeta) + rspkey := GetManagedResourcePolicy(p) + assumedRspKey := rspc.pd2Rps[podKey] + if assumedRspKey != rspkey { + klog.ErrorS(fmt.Errorf("AssignedResourcePolicyNotMatch"), "bound pod is managed by another resourcepolicy", "assumed", assumedRspKey, "bound", rspkey) + delete(rspc.pd2Rps, podKey) + + rsps, ok := rspc.rps[p.Namespace] + if ok { + rspi, ok := rsps[assumedRspKey] + if ok { + valid, labelKeyValue := genLabelKeyValueForPod(rspi.policy, p) + if valid { + rspi.processingLock.Lock() + rspi.removePod(podKey, labelKeyValue) + rspi.processingLock.Unlock() + } + } + } + } + + boundRsp := rspc.getResourcePolicyInfoByKey(rspkey, p.Namespace) + if boundRsp == nil { + klog.ErrorS(fmt.Errorf("ResourcePolicyInfoNotFound"), "bound pod is managed by a resourcepolicy that is not found", "bound", rspkey) + return + } + + nodeName := p.Spec.NodeName + if nodeName == "" { + klog.ErrorS(fmt.Errorf("PodNotBound"), "pod is not bound", "pod", klog.KObj(p)) + return + } + + node, err := rspc.ss.Get(nodeName) + if err != nil { + klog.ErrorS(err, "failed to get node", "node", nodeName) + return + } + + valid, labelKeyValue := genLabelKeyValueForPod(boundRsp.policy, p) + if !valid { + return + } + boundRsp.processingLock.Lock() + defer boundRsp.processingLock.Unlock() + podRes := resource.PodRequests(p, resource.PodResourcesOptions{}) + for idx, sel := range boundRsp.nodeSelectors { + if !sel.Matches(labels.Set(node.Node().Labels)) { + // TODO: remove pod from this count + // do not remove node from unit by update node values + continue + } + boundRsp.removePodFromBoundOrAssumedPods(boundRsp.assumedPods, idx, labelKeyValue, node.Node().Name, podKey) + boundRsp.addPodToBoundOrAssumedPods(boundRsp.boundPods, idx, labelKeyValue, nodeName, podKey, framework.NewResource(podRes)) + } +} + +func (rspc *resourcePolicyCache) DeleteBoundPod(p *corev1.Pod) { + rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() + + podKey := GetKeyStr(p.ObjectMeta) + rspkey := GetManagedResourcePolicy(p) + rsp := rspc.getResourcePolicyInfoByKey(rspkey, p.Namespace) + if rsp != nil { + valid, labelKeyValue := genLabelKeyValueForPod(rsp.policy, p) + if valid { + rsp.removePod(podKey, labelKeyValue) + } + } + + assumedRspKey := rspc.pd2Rps[podKey] + assumedRsp := rspc.getResourcePolicyInfoByKey(assumedRspKey, p.Namespace) + if rsp != nil { + valid, labelKeyValue := genLabelKeyValueForPod(assumedRsp.policy, p) + if valid { + assumedRsp.removePod(podKey, labelKeyValue) + } + } + delete(rspc.pd2Rps, podKey) + delete(rspc.assumedPd2Node, podKey) +} + +func (rspc *resourcePolicyCache) DeleteResourcePolicy(rsp *v1alpha1.ResourcePolicy) { + rspc.processingLock.Lock() + defer rspc.processingLock.Lock() + + ns := rsp.Namespace + rspKey := GetKeyStr(rsp.ObjectMeta) + + if rspInNs, ok := rspc.rps[ns]; ok { + if _, rspok := rspInNs[rspKey]; rspok { + delete(rspInNs, rspKey) + } + if len(rspInNs) == 0 { + delete(rspc.rps, ns) + } + } +} + +func (rspc *resourcePolicyCache) AddOrUpdateResPolicy(rsp *v1alpha1.ResourcePolicy) { + rspc.processingLock.Lock() + defer rspc.processingLock.Lock() + + ns := rsp.Namespace + rspKey := GetKeyStr(rsp.ObjectMeta) + + if rspInNs, ok := rspc.rps[ns]; ok { + if rspinfo, rspok := rspInNs[rspKey]; rspok { + if rspinfo.rv == rsp.ResourceVersion { + return + } + + rspinfo.processingLock.Lock() + rspinfo.processing = true + + rspinfo.rv = rsp.ResourceVersion + rspinfo.policy = rsp + rspinfo.podSelector = labels.SelectorFromSet(rsp.Spec.Selector) + rspinfo.processingLock.Unlock() + + rspc.wq.AddRateLimited(types.NamespacedName{Namespace: rsp.Namespace, Name: rsp.Name}) + return + } + } else { + rspc.rps[ns] = make(map[keyStr]*resourcePolicyInfo) + } + + newRspInfo := newResourcePolicyInfo() + newRspInfo.processingLock.Lock() + newRspInfo.processing = true + rspc.rps[ns][rspKey] = newRspInfo + + newRspInfo.ks = rspKey + newRspInfo.rv = rsp.ResourceVersion + newRspInfo.policy = rsp + newRspInfo.podSelector = labels.SelectorFromSet(rsp.Spec.Selector) + newRspInfo.processingLock.Unlock() + + rspc.wq.AddRateLimited(types.NamespacedName{Namespace: rsp.Namespace, Name: rsp.Name}) +} + +func (rspc *resourcePolicyCache) UpdateLoop() { + for { + item, shutdown := rspc.wq.Get() + if shutdown { + return + } + + rspc.processingLock.RLock() + ns := item.Namespace + rspKey := item.Namespace + "/" + item.Name + + var ok bool + var rspInNs map[keyStr]*resourcePolicyInfo + var rspinfo *resourcePolicyInfo + if rspInNs, ok = rspc.rps[ns]; !ok { + rspc.wq.Done(item) + rspc.processingLock.RUnlock() + continue + } + if rspinfo, ok = rspInNs[keyStr(rspKey)]; !ok { + rspc.wq.Done(item) + rspc.processingLock.RUnlock() + continue + } + rspc.processingLock.RUnlock() + + rspinfo.complete(rspc.pl, rspc.nl, rspc.ss, rspc.pd2Rps) + } +} diff --git a/pkg/resourcepolicy/resourcepolicyinfo.go b/pkg/resourcepolicy/resourcepolicyinfo.go new file mode 100644 index 000000000..a6a924361 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicyinfo.go @@ -0,0 +1,239 @@ +package resourcepolicy + +import ( + "fmt" + "sync" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type resourcePolicyInfo struct { + processingLock sync.Mutex + // when processing is ture, means cache of the rspinfo is waiting for reconcile + // do not schedule pod in this rsp. + processing bool + + ks keyStr + rv string + + policy *v1alpha1.ResourcePolicy + podSelector labels.Selector + nodeSelectors []labels.Selector + // + // [unit.index] { + // "node1": { + // "/": [ pod1.ns/name, ... ], + // "matchLabelKeys": [ pod2.ns/name, ... ], + // } + // } + assumedPods multiLevelPodSet + boundPods multiLevelPodSet + // contains assumed and bound pods + // [unit.index] { + // "/": 1 + // "matchLabelKeys-v1": 1 + // } + assumedPodCount []map[labelKeysValue]int + // + // key: pod.Namespace/pod.Name + podResourceDetails map[keyStr]*framework.Resource + maxPodResources []framework.Resource + assumedPodConsumption []map[labelKeysValue]*framework.Resource +} + +func newResourcePolicyInfo() *resourcePolicyInfo { + return &resourcePolicyInfo{ + assumedPods: make(multiLevelPodSet, 0), + boundPods: make(multiLevelPodSet, 0), + assumedPodCount: make([]map[labelKeysValue]int, 0), + podResourceDetails: make(map[keyStr]*framework.Resource), + maxPodResources: make([]framework.Resource, 0), + assumedPodConsumption: make([]map[labelKeysValue]*framework.Resource, 0), + } +} + +func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.NodeLister, ss framework.NodeInfoLister, + assumedPods map[keyStr]keyStr) { + if !rspi.processing { + return + } + if rspi.policy == nil { + klog.ErrorS(fmt.Errorf("ResourcePolicyInfoNotInited"), "resourcePolicyInfo not inited", "rspKey", rspi.ks) + return + } + + rspi.processingLock.Lock() + defer func() { + // resourcepolicy has been initialized, pods can be scheduled again + rspi.processing = false + rspi.processingLock.Unlock() + }() + + rspi.nodeSelectors = make([]labels.Selector, len(rspi.policy.Spec.Units)) + for idx, unit := range rspi.policy.Spec.Units { + selector := labels.NewSelector() + reqs, _ := labels.SelectorFromSet(unit.NodeSelector.MatchLabels).Requirements() + selector.Add(reqs...) + for _, exp := range unit.NodeSelector.MatchExpressions { + req, err := labels.NewRequirement(exp.Key, selection.Operator(exp.Operator), exp.Values) + if err != nil { + continue + } + selector.Add(*req) + } + rspi.nodeSelectors[idx] = selector + + nodes, err := nl.List(selector) + if err != nil { + continue + } + for _, no := range nodes { + ni, err := ss.Get(no.Name) + if err != nil { + continue + } + for _, po := range ni.Pods { + if GetManagedResourcePolicy(po.Pod) != rspi.ks && + assumedPods[GetKeyStr(po.Pod.ObjectMeta)] != rspi.ks { + continue + } + ok, labelKeyValue := genLabelKeyValueForPod(rspi.policy, po.Pod) + if !ok { + continue + } + res := resource.PodRequests(po.Pod, resource.PodResourcesOptions{}) + rspi.addPodToBoundOrAssumedPods(rspi.boundPods, idx, labelKeyValue, no.Name, GetKeyStr(po.Pod.ObjectMeta), framework.NewResource(res)) + } + } + } +} + +// lock should be get outside +// TODO: when pod resource is chanaged, assumedPodConsumption maybe wrong +func (r *resourcePolicyInfo) addPodToBoundOrAssumedPods(ps multiLevelPodSet, index int, labelValues labelKeysValue, nodename string, podKey keyStr, res *framework.Resource) bool { + podsetByValue := ps[index][nodename] + if podsetByValue == nil { + podsetByValue = make(map[labelKeysValue]sets.Set[keyStr]) + } + podset := podsetByValue[labelValues] + if podset == nil { + podset = sets.New[keyStr]() + } + if podset.Has(podKey) { + return false + } + podset.Insert(podKey) + podsetByValue[labelValues] = podset + ps[index][nodename] = podsetByValue + if curRes, ok := r.podResourceDetails[podKey]; !ok || !equalResource(res, curRes) { + r.podResourceDetails[podKey] = res + } + r.updateAssumedPodCount(index, labelValues, 1, res) + return true +} + +// TODO: handle the case that pod's labelKeyValue was changed +func (r *resourcePolicyInfo) removePod(podKeyStr keyStr, labelKeyValue labelKeysValue) { + for _, podSetByNode := range r.boundPods { + for _, podSetByLabelValues := range podSetByNode { + for _, podSet := range podSetByLabelValues { + podSet.Delete(podKeyStr) + } + } + } + for _, podSetByNode := range r.assumedPods { + for _, podSetByLabelValues := range podSetByNode { + for _, podSet := range podSetByLabelValues { + podSet.Delete(podKeyStr) + } + } + } + + podRes := r.podResourceDetails[podKeyStr] + if podRes != nil { + for idx := range r.assumedPodConsumption { + r.updateAssumedPodResource(idx, labelKeyValue, false, podRes) + } + delete(r.podResourceDetails, podKeyStr) + } +} + +// lock should be get outside +func (r *resourcePolicyInfo) removePodFromBoundOrAssumedPods(ps multiLevelPodSet, index int, labelValues labelKeysValue, nodename string, podKey keyStr) (bool, error) { + podsetByValue := ps[index][nodename] + if podsetByValue == nil { + return false, nil // fmt.Errorf("labelValues %v not found", labelValues) + } + podset := podsetByValue[labelValues] + if podset == nil { + return false, nil // fmt.Errorf("node %v not found", nodename) + } + if !podset.Has(podKey) { + return false, fmt.Errorf("pod %v not found", podKey) + } + podset.Delete(podKey) + if podset.Len() == 0 { + delete(podsetByValue, labelValues) + } else { + podsetByValue[labelValues] = podset + } + ps[index][nodename] = podsetByValue + + res := framework.NewResource(nil) + if detail, ok := r.podResourceDetails[podKey]; ok { + res = detail + } + if err := r.updateAssumedPodCount(index, labelValues, -1, res); err != nil { + return false, err + } + return true, nil +} + +func (r *resourcePolicyInfo) updateAssumedPodCount(index int, v labelKeysValue, count int, totalRes *framework.Resource) error { + if count > 0 { + if r.assumedPodCount[index] == nil { + r.assumedPodCount[index] = map[labelKeysValue]int{v: count} + } else { + r.assumedPodCount[index][v] += count + } + r.updateAssumedPodResource(index, v, true, totalRes) + } else { + if r.assumedPodCount[index] == nil { + return fmt.Errorf("pod not found in assumedPodCount, this should never happen") + } else { + r.assumedPodCount[index][v] += count + if r.assumedPodCount[index][v] == 0 { + delete(r.assumedPodCount[index], v) + } + } + r.updateAssumedPodResource(index, v, false, totalRes) + } + return nil +} + +func (r *resourcePolicyInfo) updateAssumedPodResource(index int, v labelKeysValue, add bool, totalRes *framework.Resource) { + if add { + if r.assumedPodConsumption[index] == nil { + r.assumedPodConsumption[index] = map[labelKeysValue]*framework.Resource{v: totalRes.Clone()} + } else { + addResources(r.assumedPodConsumption[index][v], totalRes) + } + } else { + if r.assumedPodConsumption[index] == nil { + return + } else { + if r.assumedPodConsumption[index][v] == nil { + return + } + subResources(r.assumedPodConsumption[index][v], totalRes) + } + } +} diff --git a/pkg/resourcepolicy/schedulingcontext.go b/pkg/resourcepolicy/schedulingcontext.go new file mode 100644 index 000000000..7583db71e --- /dev/null +++ b/pkg/resourcepolicy/schedulingcontext.go @@ -0,0 +1,11 @@ +package resourcepolicy + +// schedulingContext is created when the pod scheduling started. and the context is deleted +// when the pod is deleted or bound. +type schedulingContext struct { + matched keyStr + // when begin to schedule, unitIdx is the index of last scheduling attempt + // in the scheduling cycle, unitIdx is the index of current scheduling attempt + unitIdx int + resourceVersion string +} diff --git a/pkg/resourcepolicy/types.go b/pkg/resourcepolicy/types.go new file mode 100644 index 000000000..9cacc96d6 --- /dev/null +++ b/pkg/resourcepolicy/types.go @@ -0,0 +1,180 @@ +package resourcepolicy + +import ( + "strings" + + "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type keyStr string +type labelKeysValue string +type multiLevelPodSet []map[string]map[labelKeysValue]sets.Set[keyStr] + +func genLabelKeysValue(strs []string) labelKeysValue { + return labelKeysValue(strings.Join(strs, "/")) +} + +// ret: valid, labelValue +func genLabelKeyValueForPod(rp *v1alpha1.ResourcePolicy, pod *v1.Pod) (bool, labelKeysValue) { + if len(rp.Spec.MatchLabelKeys) == 0 { + return true, "/" + } + value := "" + for _, key := range rp.Spec.MatchLabelKeys { + if v, ok := pod.Labels[key]; !ok { + return false, "" + } else { + value += v + } + } + return true, labelKeysValue(value) +} + +func GetKeyStr(obj metav1.ObjectMeta) keyStr { + return keyStr(obj.Namespace + "/" + obj.Name) +} + +const ManagedByResourcePolicyAnnoKey = "scheduling.x-k8s.io/managed-by-resourcepolicy" +const ManagedByResourcePolicyIndexKey = "metadata.annotations[scheduling.x-k8s.io/managed-by-resourcepolicy]" +const ResourcePolicyPreFilterStateKey = "scheduling.x-k8s.io/resourcepolicy-prefilter-state" + +type ResourcePolicyPreFilterState struct { + matchedInfo *resourcePolicyInfo + + maxCount []int + currentCount []int + resConsumption []*framework.Resource + maxConsumption []*framework.Resource + + labelKeyValue labelKeysValue + podRes *framework.Resource + + nodeSelectos []labels.Selector +} + +func (r *ResourcePolicyPreFilterState) Clone() framework.StateData { + ret := &ResourcePolicyPreFilterState{ + matchedInfo: r.matchedInfo, + + maxCount: SliceCopyInt(r.maxCount), + currentCount: SliceCopyInt(r.currentCount), + resConsumption: SliceCopyRes(r.resConsumption), + maxConsumption: SliceCopyRes(r.maxConsumption), + + labelKeyValue: r.labelKeyValue, + podRes: r.podRes, + } + return ret +} + +func GetManagedResourcePolicy(p *v1.Pod) keyStr { + return keyStr(p.Annotations[ManagedByResourcePolicyAnnoKey]) +} + +func equalResource(res1, res2 *framework.Resource) bool { + if res1 == nil && res2 == nil { + return true + } + if res1 == nil && res2 != nil || res1 != nil && res2 == nil { + return false + } + if res1.MilliCPU != res2.MilliCPU || + res1.Memory != res2.Memory || + res1.EphemeralStorage != res2.EphemeralStorage || + res1.AllowedPodNumber != res2.AllowedPodNumber { + return false + } + if len(res1.ScalarResources) != len(res2.ScalarResources) { + return false + } + for k, v := range res1.ScalarResources { + if v2, ok := res2.ScalarResources[k]; !ok || v != v2 { + return false + } + } + return true +} + +func addResources(res1, res2 *framework.Resource) { + if res1 == nil || res2 == nil { + return + } + res1.MilliCPU += res2.MilliCPU + res1.Memory += res2.Memory + res1.EphemeralStorage += res2.EphemeralStorage + res1.AllowedPodNumber += res2.AllowedPodNumber + for k, v := range res2.ScalarResources { + if _, ok := res1.ScalarResources[k]; !ok { + res1.ScalarResources[k] = v + } else { + res1.ScalarResources[k] += v + } + } +} + +func subResources(res1, res2 *framework.Resource) { + if res1 == nil || res2 == nil { + return + } + res1.MilliCPU -= res2.MilliCPU + if res1.MilliCPU < 0 { + res1.MilliCPU = 0 + } + res1.Memory -= res2.Memory + if res1.Memory < 0 { + res1.Memory = 0 + } + res1.EphemeralStorage -= res2.EphemeralStorage + if res1.EphemeralStorage < 0 { + res1.EphemeralStorage = 0 + } + res1.AllowedPodNumber -= res2.AllowedPodNumber + if res1.AllowedPodNumber < 0 { + res1.AllowedPodNumber = 0 + } + for k, v := range res2.ScalarResources { + if _, ok := res1.ScalarResources[k]; ok { + res1.ScalarResources[k] -= v + if res1.ScalarResources[k] < 0 { + res1.ScalarResources[k] = 0 + } + } + } +} + +func lessThan(res1, res2 *framework.Resource) (bool, string) { + if res1.MilliCPU > 0 && res1.MilliCPU >= res2.MilliCPU { + return false, "cpu" + } + if res1.Memory > 0 && res1.Memory >= res2.Memory { + return false, "memory" + } + if res1.EphemeralStorage > 0 && res1.EphemeralStorage >= res2.EphemeralStorage { + return false, "ephemeral-storage" + } + for k, v := range res1.ScalarResources { + if v2, ok := res2.ScalarResources[k]; !ok || ok && v > 0 && v >= v2 { + return false, string(k) + } + } + return true, "" +} + +func SliceCopyInt(src []int) []int { + dst := make([]int, len(src)) + copy(dst, src) + return dst +} + +func SliceCopyRes(src []*framework.Resource) []*framework.Resource { + dst := make([]*framework.Resource, len(src)) + for i := range src { + dst[i] = src[i].Clone() + } + return dst +} From 9bcdb7dcf873f072708d3e11960bd87ed68f1fa8 Mon Sep 17 00:00:00 2001 From: kunwuluan Date: Mon, 12 May 2025 20:29:34 +0800 Subject: [PATCH 2/4] add copyright Signed-off-by: KunWuLuan --- pkg/resourcepolicy/resourcepolicy.go | 16 ++++++++++++++++ pkg/resourcepolicy/resourcepolicy_cache.go | 16 ++++++++++++++++ pkg/resourcepolicy/resourcepolicyinfo.go | 16 ++++++++++++++++ pkg/resourcepolicy/schedulingcontext.go | 16 ++++++++++++++++ pkg/resourcepolicy/types.go | 16 ++++++++++++++++ 5 files changed, 80 insertions(+) diff --git a/pkg/resourcepolicy/resourcepolicy.go b/pkg/resourcepolicy/resourcepolicy.go index 2016ad9ec..6d8fdf7c3 100644 --- a/pkg/resourcepolicy/resourcepolicy.go +++ b/pkg/resourcepolicy/resourcepolicy.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package resourcepolicy import ( diff --git a/pkg/resourcepolicy/resourcepolicy_cache.go b/pkg/resourcepolicy/resourcepolicy_cache.go index f8c5c4889..68e2c05f3 100644 --- a/pkg/resourcepolicy/resourcepolicy_cache.go +++ b/pkg/resourcepolicy/resourcepolicy_cache.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package resourcepolicy import ( diff --git a/pkg/resourcepolicy/resourcepolicyinfo.go b/pkg/resourcepolicy/resourcepolicyinfo.go index a6a924361..59285a088 100644 --- a/pkg/resourcepolicy/resourcepolicyinfo.go +++ b/pkg/resourcepolicy/resourcepolicyinfo.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package resourcepolicy import ( diff --git a/pkg/resourcepolicy/schedulingcontext.go b/pkg/resourcepolicy/schedulingcontext.go index 7583db71e..ce40d004f 100644 --- a/pkg/resourcepolicy/schedulingcontext.go +++ b/pkg/resourcepolicy/schedulingcontext.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package resourcepolicy // schedulingContext is created when the pod scheduling started. and the context is deleted diff --git a/pkg/resourcepolicy/types.go b/pkg/resourcepolicy/types.go index 9cacc96d6..3dd787925 100644 --- a/pkg/resourcepolicy/types.go +++ b/pkg/resourcepolicy/types.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package resourcepolicy import ( From 68e40f8b518120cd251e7264ead61e8b3199a5f0 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Sun, 15 Jun 2025 13:12:09 +0800 Subject: [PATCH 3/4] fix some bugs, integration test passed. TODO: add unit tests Signed-off-by: KunWuLuan --- cmd/scheduler/main.go | 2 + go.mod | 36 ++-- go.sum | 26 +-- pkg/config/samples/resourcepolicy/crd.yaml | 98 +++++++++++ .../samples/resourcepolicy/deployment.yaml | 24 +++ .../resourcepolicy/resourcepolicy.yaml | 12 ++ .../samples/resourcepolicy/scheduler.config | 12 ++ pkg/resourcepolicy/resourcepolicy.go | 154 ++++++++++++++++-- pkg/resourcepolicy/resourcepolicy_cache.go | 31 ++-- pkg/resourcepolicy/resourcepolicy_test.go | 81 +++++++++ pkg/resourcepolicy/resourcepolicyinfo.go | 41 +++-- pkg/resourcepolicy/types.go | 29 ++-- 12 files changed, 458 insertions(+), 88 deletions(-) create mode 100644 pkg/config/samples/resourcepolicy/crd.yaml create mode 100644 pkg/config/samples/resourcepolicy/deployment.yaml create mode 100644 pkg/config/samples/resourcepolicy/resourcepolicy.yaml create mode 100644 pkg/config/samples/resourcepolicy/scheduler.config create mode 100644 pkg/resourcepolicy/resourcepolicy_test.go diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 7c915be2c..27b938ce5 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/podstate" "sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration" "sigs.k8s.io/scheduler-plugins/pkg/qos" + "sigs.k8s.io/scheduler-plugins/pkg/resourcepolicy" "sigs.k8s.io/scheduler-plugins/pkg/sysched" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing" "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment" @@ -60,6 +61,7 @@ func main() { app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New), app.WithPlugin(sysched.Name, sysched.New), app.WithPlugin(peaks.Name, peaks.New), + app.WithPlugin(resourcepolicy.Name, resourcepolicy.New), // Sample plugins below. // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New), app.WithPlugin(podstate.Name, podstate.New), diff --git a/go.mod b/go.mod index ccae6a5b3..c9533a858 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,15 @@ module sigs.k8s.io/scheduler-plugins -go 1.24.2 +go 1.23.0 require ( - github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc + github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94 github.com/containers/common v0.46.0 github.com/diktyo-io/appgroup-api v1.0.1-alpha github.com/diktyo-io/networktopology-api v1.0.1-alpha github.com/dustin/go-humanize v1.0.1 github.com/go-logr/logr v1.4.2 - github.com/google/go-cmp v0.6.0 + github.com/google/go-cmp v0.7.0 github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.2 github.com/k8stopologyawareschedwg/podfingerprint v0.2.2 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -17,18 +17,18 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 gonum.org/v1/gonum v0.12.0 - k8s.io/api v0.33.0 - k8s.io/apimachinery v0.33.0 + k8s.io/api v0.33.1 + k8s.io/apimachinery v0.33.1 k8s.io/apiserver v0.32.5 - k8s.io/client-go v0.32.5 + k8s.io/client-go v0.33.1 k8s.io/code-generator v0.33.0 k8s.io/component-base v0.32.5 k8s.io/component-helpers v0.32.5 k8s.io/klog/v2 v2.130.1 - k8s.io/kube-scheduler v0.31.8 - k8s.io/kubernetes v1.31.8 + k8s.io/kube-scheduler v0.32.5 + k8s.io/kubernetes v1.32.5 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 - sigs.k8s.io/controller-runtime v0.19.2 + sigs.k8s.io/controller-runtime v0.20.4 sigs.k8s.io/logtools v0.9.0 sigs.k8s.io/security-profiles-operator v0.4.0 sigs.k8s.io/structured-merge-diff/v4 v4.6.0 @@ -64,7 +64,7 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/btree v1.0.1 // indirect + github.com/google/btree v1.1.3 // indirect github.com/google/cel-go v0.22.0 // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -112,12 +112,12 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.38.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect + golang.org/x/oauth2 v0.27.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect - golang.org/x/time v0.7.0 // indirect + golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.26.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240826202546-f6391c0de4c7 // indirect @@ -129,11 +129,11 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.31.8 // indirect + k8s.io/apiextensions-apiserver v0.32.1 // indirect k8s.io/cloud-provider v0.32.5 // indirect k8s.io/controller-manager v0.32.5 // indirect - k8s.io/csi-translation-lib v0.31.8 // indirect - k8s.io/dynamic-resource-allocation v0.31.8 // indirect + k8s.io/csi-translation-lib v0.32.5 // indirect + k8s.io/dynamic-resource-allocation v0.32.5 // indirect k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 // indirect k8s.io/kms v0.32.5 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect @@ -167,10 +167,8 @@ replace ( k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.32.5 k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.32.5 k8s.io/kube-proxy => k8s.io/kube-proxy v0.32.5 - k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.32.5 k8s.io/kubectl => k8s.io/kubectl v0.32.5 k8s.io/kubelet => k8s.io/kubelet v0.32.5 - k8s.io/kubernetes => k8s.io/kubernetes v1.32.5 k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.31.8 k8s.io/metrics => k8s.io/metrics v0.32.5 k8s.io/mount-utils => k8s.io/mount-utils v0.32.5 @@ -183,3 +181,7 @@ replace k8s.io/externaljwt => k8s.io/externaljwt v0.32.5 replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.32.5 replace k8s.io/sample-controller => k8s.io/sample-controller v0.32.5 + +replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.32.5 + +replace k8s.io/kubernetes => k8s.io/kubernetes v1.32.5 diff --git a/go.sum b/go.sum index d31151e0e..d876c6237 100644 --- a/go.sum +++ b/go.sum @@ -1342,8 +1342,8 @@ github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= -github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc h1:tQ0XOIup39emiznGpxgV+PQAeqKvr0DeLfZf9ReqQfc= -github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250429133120-0b5462735cdc/go.mod h1:6G5vMUV/xm+IEwNrNaxvZZr6WiqAHq+o+YsU0TiwXO0= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94 h1:XQQCH4Wf2n627vWxr7+uIKuHwzlQ5MFLZN/tD+jzwU4= +github.com/KunWuLuan/resourcepolicyapi v0.0.0-20250727121406-048ef1326d94/go.mod h1:kPff4zxO6pT4b/nJQYmMM2giPbVmi6FiE5pQTS8Ux34= github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= @@ -1908,8 +1908,9 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/gomarkdown/markdown v0.0.0-20210514010506-3b9f47219fe7/go.mod h1:aii0r/K0ZnHv7G0KF7xy1v0A7s2Ljrb5byB7MO5p6TU= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cadvisor v0.51.0/go.mod h1:czGE/c/P/i0QFpVNKTFrIEzord9Y10YfpwuaSWXELc0= github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= @@ -1933,8 +1934,9 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-github/v33 v33.0.0/go.mod h1:GMdDnVZY/2TsWgp/lkYnpSAh6TrzhANBBwm6k6TTEXg= github.com/google/go-intervals v0.0.2/go.mod h1:MkaR3LNRfeKLPmqgJYs4E66z5InYjmCjbbr4TQlcT6Y= @@ -2309,8 +2311,9 @@ github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3Hig github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/ginkgo/v2 v2.20.1/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -2342,8 +2345,9 @@ github.com/onsi/gomega v1.33.0/go.mod h1:+925n5YtiFsLzzafLUHzVMBpvvRAzrydIBiSIxj github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= +github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -3077,8 +3081,9 @@ golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5H golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= +golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -3336,8 +3341,9 @@ golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -4101,8 +4107,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcp sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.9.2/go.mod h1:TxzMCHyEUpaeuOiZx/bIdc2T81vfs/aKdvJt9wuu0zk= sigs.k8s.io/controller-runtime v0.10.3/go.mod h1:CQp8eyUQZ/Q7PJvnIrB6/hgfTC1kBkGylwsLgOQi1WY= -sigs.k8s.io/controller-runtime v0.19.2 h1:3sPrF58XQEPzbE8T81TN6selQIMGbtYwuaJ6eDssDF8= -sigs.k8s.io/controller-runtime v0.19.2/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= +sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= +sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= sigs.k8s.io/controller-tools v0.2.4/go.mod h1:m/ztfQNocGYBgTTCmFdnK94uVvgxeZeE3LtJvd/jIzA= sigs.k8s.io/controller-tools v0.7.0/go.mod h1:bpBAo0VcSDDLuWt47evLhMLPxRPxMDInTEH/YbdeMK0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= diff --git a/pkg/config/samples/resourcepolicy/crd.yaml b/pkg/config/samples/resourcepolicy/crd.yaml new file mode 100644 index 000000000..00a48dd12 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/crd.yaml @@ -0,0 +1,98 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.1-0.20250610061224-4cb7e87f3b45 + name: resourcepolicies.scheduling.x-k8s.io +spec: + group: scheduling.x-k8s.io + names: + kind: ResourcePolicy + listKind: ResourcePolicyList + plural: resourcepolicies + shortNames: + - rsp + singular: resourcepolicy + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + matchLabelKeys: + items: + type: string + type: array + selector: + additionalProperties: + type: string + type: object + units: + items: + properties: + max: + format: int32 + type: integer + maxResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + nodeSelector: + additionalProperties: + type: string + type: object + podAnnotations: + additionalProperties: + type: string + type: object + podLabels: + additionalProperties: + type: string + type: object + type: object + nullable: true + type: array + x-kubernetes-list-type: atomic + type: object + status: + properties: + lastUpdateTime: + format: date-time + type: string + pods: + items: + format: int64 + type: integer + type: array + type: object + required: + - spec + type: object + served: true + storage: true \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/deployment.yaml b/pkg/config/samples/resourcepolicy/deployment.yaml new file mode 100644 index 000000000..1e6b74a59 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/deployment.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx-deployment +spec: + replicas: 1 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + schedulerName: custom-scheduler + containers: + - name: nginx-container + resources: + limits: + cpu: 50m + memory: 50Mi + image: nginx:latest + ports: + - containerPort: 80 \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/resourcepolicy.yaml b/pkg/config/samples/resourcepolicy/resourcepolicy.yaml new file mode 100644 index 000000000..5104e7030 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/resourcepolicy.yaml @@ -0,0 +1,12 @@ +apiVersion: scheduling.x-k8s.io/v1alpha1 +kind: ResourcePolicy +metadata: + name: resourcepolicy-sample + namespace: default +spec: + selector: + app: nginx + units: + - max: 1 + nodeSelector: + unit: first \ No newline at end of file diff --git a/pkg/config/samples/resourcepolicy/scheduler.config b/pkg/config/samples/resourcepolicy/scheduler.config new file mode 100644 index 000000000..d684dc3f5 --- /dev/null +++ b/pkg/config/samples/resourcepolicy/scheduler.config @@ -0,0 +1,12 @@ +apiVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +leaderElection: + leaderElect: false +clientConnection: + kubeconfig: /home/kunwu1997/.kube/config +profiles: +- schedulerName: custom-scheduler + plugins: + multiPoint: + enabled: + - name: ResourcePolicy \ No newline at end of file diff --git a/pkg/resourcepolicy/resourcepolicy.go b/pkg/resourcepolicy/resourcepolicy.go index 6d8fdf7c3..00c845585 100644 --- a/pkg/resourcepolicy/resourcepolicy.go +++ b/pkg/resourcepolicy/resourcepolicy.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "strings" "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" @@ -29,9 +30,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -59,7 +62,7 @@ func (rspp *resourcePolicyPlugin) Name() string { func New(ctx context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { lh := klog.FromContext(ctx).WithValues("plugin", Name) - lh.V(5).Info("creating new coscheduling plugin") + lh.V(2).Info("creating new resourcepolicy plugin") podLister := handle.SharedInformerFactory().Core().V1().Pods().Lister() nodeLister := handle.SharedInformerFactory().Core().V1().Nodes().Lister() @@ -77,8 +80,16 @@ func New(ctx context.Context, args runtime.Object, handle framework.Handle) (fra if err != nil { return nil, err } + defer func() { + ccache.Start(ctx) + ccache.WaitForCacheSync(ctx) + lh.V(2).Info("ResourcePolicyCache synced.") + }() - rspInformer, _ := ccache.GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ResourcePolicy")) + rspInformer, err := ccache.GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ResourcePolicy")) + if err != nil { + return nil, err + } rspInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { rsp, ok := obj.(*v1alpha1.ResourcePolicy) @@ -109,12 +120,42 @@ func New(ctx context.Context, args runtime.Object, handle framework.Handle) (fra }, }) - handle.SharedInformerFactory().Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) + handle.SharedInformerFactory().Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + switch t := obj.(type) { + case *v1.Node: + node := t + pods, err := handle.SharedInformerFactory().Core().V1().Pods().Informer().GetIndexer().ByIndex("spec.nodeName", node.Name) + if err != nil { + klog.Error(err, "get node pods failed") + return + } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() + + podKeys := []string{} + for _, i := range pods { + pod := i.(*v1.Pod) + rsp := GetManagedResourcePolicy(pod) + if rsp == "" { + continue + } + podKeys = append(podKeys, string(klog.KObj(pod).String())) + rspCache.AddOrUpdateBoundPod(pod) + } + klog.InfoS("add node event", "processedPod", strings.Join(podKeys, ",")) + } + }, + }) handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{ ManagedByResourcePolicyIndexKey: func(obj interface{}) ([]string, error) { return []string{string(GetManagedResourcePolicy(obj.(*v1.Pod)))}, nil }, + "spec.nodeName": func(obj interface{}) ([]string, error) { + p := obj.(*v1.Pod) + return []string{p.Spec.NodeName}, nil + }, }) handle.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { @@ -136,14 +177,20 @@ func New(ctx context.Context, args runtime.Object, handle framework.Handle) (fra if !ok { return } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() rspCache.AddOrUpdateBoundPod(pd) + klog.Info("add event for scheduled pod", "pod", klog.KObj(pd)) }, UpdateFunc: func(oldObj, newObj interface{}) { pd, ok := newObj.(*v1.Pod) if !ok { return } + rspCache.processingLock.Lock() + defer rspCache.processingLock.Unlock() rspCache.AddOrUpdateBoundPod(pd) + klog.Info("update event for scheduled pod", "pod", klog.KObj(pd)) }, DeleteFunc: func(obj interface{}) { var pd *v1.Pod @@ -163,12 +210,10 @@ func New(ctx context.Context, args runtime.Object, handle framework.Handle) (fra }) plg := &resourcePolicyPlugin{ - cache: &resourcePolicyCache{ - pd2Rps: make(map[keyStr]keyStr), - rps: make(map[string]map[keyStr]*resourcePolicyInfo), - }, - client: c, - handle: handle, + cache: rspCache, + client: c, + handle: handle, + schedulingCtx: map[keyStr]*schedulingContext{}, } return plg, nil } @@ -184,6 +229,7 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor var matched *resourcePolicyInfo var schedCtx *schedulingContext var ok bool + logger := klog.FromContext(ctx).WithValues("pod", klog.KObj(pod)) rspp.cache.processingLock.RLock() podKey := GetKeyStr(pod.ObjectMeta) if schedCtx, ok = rspp.schedulingCtx[podKey]; ok && schedCtx.matched != "" { @@ -193,13 +239,15 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor matched = nil } } else { - rspp.schedulingCtx[podKey] = &schedulingContext{} + schedCtx = &schedulingContext{} + rspp.schedulingCtx[podKey] = schedCtx } if matched == nil { resourcePoliciesInNamespace := rspp.cache.rps[pod.Namespace] if len(resourcePoliciesInNamespace) == 0 { rspp.cache.processingLock.RUnlock() + logger.V(2).Info("no resourcePolicy matches pod") return nil, framework.NewStatus(framework.Skip) } @@ -216,12 +264,19 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor rspp.cache.processingLock.RUnlock() if matched == nil { + logger.V(2).Info("no resourcePolicy matches pod") return nil, framework.NewStatus(framework.Skip, "no resourcePolicy matches pod") } else { + logger = logger.WithValues("resourcePolicy", matched.ks) schedCtx.matched = matched.ks schedCtx.resourceVersion = matched.rv schedCtx.unitIdx = 0 } + matched.processingLock.Lock() + for matched.processing { + matched.cond.Wait() + } + matched.processingLock.Unlock() valid, labelKeyValue := genLabelKeyValueForPod(matched.policy, pod) if !valid { @@ -229,7 +284,7 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor } preFilterState := &ResourcePolicyPreFilterState{ matchedInfo: matched, - podRes: framework.NewResource(resource.PodRequests(pod, resource.PodResourcesOptions{})), + podRes: framework.NewResource(resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{})), labelKeyValue: labelKeyValue, currentCount: make([]int, len(matched.nodeSelectors)), @@ -246,6 +301,9 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor preFilterState.resConsumption[idx] = consumption[labelKeyValue] } for idx, max := range matched.maxPodResources { + if max == nil { + continue + } preFilterState.maxConsumption[idx] = max.Clone() } copy(preFilterState.nodeSelectos, matched.nodeSelectors) @@ -257,9 +315,35 @@ func (rspp *resourcePolicyPlugin) PreFilter(ctx context.Context, state *framewor } } state.Write(ResourcePolicyPreFilterStateKey, preFilterState) + logger.V(2).Info("details of matched resource policy", "maxCount", preFilterState.maxCount, "currentCount", preFilterState.currentCount, + "maxConsumption", resourceListToStr(preFilterState.maxConsumption), "resConsumption", resourceListToStr(preFilterState.resConsumption), + "nodeSelectors", nodeSelectorsToStr(preFilterState.nodeSelectos)) return nil, nil } +func nodeSelectorsToStr(selectors []labels.Selector) string { + res := []string{} + for _, selector := range selectors { + if selector == nil { + continue + } + res = append(res, selector.String()) + } + return strings.Join(res, "|") +} + +func resourceListToStr(list []*framework.Resource) string { + res := []string{} + for _, r := range list { + if r == nil { + res = append(res, "nil") + continue + } + res = append(res, fmt.Sprintf("cpu:%v,memory:%v,scalar:%+v", r.MilliCPU, r.Memory, r.ScalarResources)) + } + return strings.Join(res, "|") +} + func (rspp *resourcePolicyPlugin) PreFilterExtensions() framework.PreFilterExtensions { return nil } @@ -283,12 +367,20 @@ func findAvailableUnitForNode(nodeInfo *framework.NodeInfo, state *ResourcePolic }) continue } - if lt, res := lessThan(state.resConsumption[idx], state.maxConsumption[idx]); lt { - notValidIdx = append(notValidIdx, unitNotAvaiInfo{ - idx: idx, - res: res, - }) - continue + if state.maxConsumption[idx] != nil && state.resConsumption[idx] != nil { + res := state.resConsumption[idx].Clone() + res.MilliCPU += state.podRes.MilliCPU + res.Memory += state.podRes.Memory + for k, v := range state.podRes.ScalarResources { + res.AddScalar(k, v) + } + if gt, res := largeThan(res, state.maxConsumption[idx]); gt { + notValidIdx = append(notValidIdx, unitNotAvaiInfo{ + idx: idx, + res: res, + }) + continue + } } found = idx break @@ -306,6 +398,9 @@ func (rspp *resourcePolicyPlugin) Filter(ctx context.Context, state *framework.C return framework.AsStatus(fmt.Errorf("cannot convert %T to ResourcePolicyPreFilterState", obj)) } if avai, info := findAvailableUnitForNode(nodeInfo, preFilterState); avai == -1 { + if len(info) == 0 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("not match any unit")) + } return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("unit not available: %v", info)) } return nil @@ -356,5 +451,28 @@ func (rspp *resourcePolicyPlugin) Unreserve(ctx context.Context, state *framewor } func (rspp *resourcePolicyPlugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + obj, err := state.Read(ResourcePolicyPreFilterStateKey) + if err != nil { + return nil + } + preFilterState, ok := obj.(*ResourcePolicyPreFilterState) + if !ok { + return framework.AsStatus(fmt.Errorf("unable to convert state to ResourcePolicyPreFilterState")) + } + newPod := v1.Pod{} + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := rspp.client.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, &newPod) + if err != nil { + return err + } + if len(newPod.Annotations) == 0 { + newPod.Annotations = make(map[string]string) + } + newPod.Annotations[ManagedByResourcePolicyAnnoKey] = string(preFilterState.matchedInfo.ks) + return rspp.client.Update(ctx, &newPod) + }) + if err != nil { + return framework.AsStatus(fmt.Errorf("unable to get pod %v/%v", p.Namespace, p.Name)) + } return nil } diff --git a/pkg/resourcepolicy/resourcepolicy_cache.go b/pkg/resourcepolicy/resourcepolicy_cache.go index 68e2c05f3..f0c8b4264 100644 --- a/pkg/resourcepolicy/resourcepolicy_cache.go +++ b/pkg/resourcepolicy/resourcepolicy_cache.go @@ -27,8 +27,8 @@ import ( "k8s.io/apimachinery/pkg/types" v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/workqueue" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -56,7 +56,7 @@ func NewResourcePolicyCache( pl v1.PodLister, ss framework.NodeInfoLister, ) *resourcePolicyCache { - return &resourcePolicyCache{ + cache := &resourcePolicyCache{ rps: make(map[string]map[keyStr]*resourcePolicyInfo), pd2Rps: make(map[keyStr]keyStr), @@ -68,6 +68,8 @@ func NewResourcePolicyCache( pl: pl, ss: ss, } + go cache.updateLoop() + return cache } // get the lock outside @@ -96,6 +98,7 @@ func (rspc *resourcePolicyCache) Assume(cycleState *framework.CycleState, pod *c if node, ok := rspc.assumedPd2Node[podKey]; ok { return fmt.Errorf("PodAlreadyAssumed assumed node: %v", node) } + rspc.pd2Rps[podKey] = preFilterState.matchedInfo.ks rspi := preFilterState.matchedInfo for idx, sel := range rspi.nodeSelectors { @@ -132,14 +135,12 @@ func (rspc *resourcePolicyCache) Forget(cycleState *framework.CycleState, pod *c } } +// need to obtain lock outside func (rspc *resourcePolicyCache) AddOrUpdateBoundPod(p *corev1.Pod) { - rspc.processingLock.Lock() - defer rspc.processingLock.Unlock() - podKey := GetKeyStr(p.ObjectMeta) rspkey := GetManagedResourcePolicy(p) - assumedRspKey := rspc.pd2Rps[podKey] - if assumedRspKey != rspkey { + assumedRspKey, ok := rspc.pd2Rps[podKey] + if ok && assumedRspKey != rspkey { klog.ErrorS(fmt.Errorf("AssignedResourcePolicyNotMatch"), "bound pod is managed by another resourcepolicy", "assumed", assumedRspKey, "bound", rspkey) delete(rspc.pd2Rps, podKey) @@ -169,7 +170,7 @@ func (rspc *resourcePolicyCache) AddOrUpdateBoundPod(p *corev1.Pod) { return } - node, err := rspc.ss.Get(nodeName) + node, err := rspc.nl.Get(nodeName) if err != nil { klog.ErrorS(err, "failed to get node", "node", nodeName) return @@ -181,14 +182,14 @@ func (rspc *resourcePolicyCache) AddOrUpdateBoundPod(p *corev1.Pod) { } boundRsp.processingLock.Lock() defer boundRsp.processingLock.Unlock() - podRes := resource.PodRequests(p, resource.PodResourcesOptions{}) + podRes := resourcehelper.PodRequests(p, resourcehelper.PodResourcesOptions{}) for idx, sel := range boundRsp.nodeSelectors { - if !sel.Matches(labels.Set(node.Node().Labels)) { + if !sel.Matches(labels.Set(node.Labels)) { // TODO: remove pod from this count // do not remove node from unit by update node values continue } - boundRsp.removePodFromBoundOrAssumedPods(boundRsp.assumedPods, idx, labelKeyValue, node.Node().Name, podKey) + boundRsp.removePodFromBoundOrAssumedPods(boundRsp.assumedPods, idx, labelKeyValue, node.Name, podKey) boundRsp.addPodToBoundOrAssumedPods(boundRsp.boundPods, idx, labelKeyValue, nodeName, podKey, framework.NewResource(podRes)) } } @@ -209,7 +210,7 @@ func (rspc *resourcePolicyCache) DeleteBoundPod(p *corev1.Pod) { assumedRspKey := rspc.pd2Rps[podKey] assumedRsp := rspc.getResourcePolicyInfoByKey(assumedRspKey, p.Namespace) - if rsp != nil { + if assumedRsp != nil { valid, labelKeyValue := genLabelKeyValueForPod(assumedRsp.policy, p) if valid { assumedRsp.removePod(podKey, labelKeyValue) @@ -221,7 +222,7 @@ func (rspc *resourcePolicyCache) DeleteBoundPod(p *corev1.Pod) { func (rspc *resourcePolicyCache) DeleteResourcePolicy(rsp *v1alpha1.ResourcePolicy) { rspc.processingLock.Lock() - defer rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() ns := rsp.Namespace rspKey := GetKeyStr(rsp.ObjectMeta) @@ -238,7 +239,7 @@ func (rspc *resourcePolicyCache) DeleteResourcePolicy(rsp *v1alpha1.ResourcePoli func (rspc *resourcePolicyCache) AddOrUpdateResPolicy(rsp *v1alpha1.ResourcePolicy) { rspc.processingLock.Lock() - defer rspc.processingLock.Lock() + defer rspc.processingLock.Unlock() ns := rsp.Namespace rspKey := GetKeyStr(rsp.ObjectMeta) @@ -278,7 +279,7 @@ func (rspc *resourcePolicyCache) AddOrUpdateResPolicy(rsp *v1alpha1.ResourcePoli rspc.wq.AddRateLimited(types.NamespacedName{Namespace: rsp.Namespace, Name: rsp.Name}) } -func (rspc *resourcePolicyCache) UpdateLoop() { +func (rspc *resourcePolicyCache) updateLoop() { for { item, shutdown := rspc.wq.Get() if shutdown { diff --git a/pkg/resourcepolicy/resourcepolicy_test.go b/pkg/resourcepolicy/resourcepolicy_test.go new file mode 100644 index 000000000..6c7acf843 --- /dev/null +++ b/pkg/resourcepolicy/resourcepolicy_test.go @@ -0,0 +1,81 @@ +package resourcepolicy + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + framework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +func TestFindAvailableUnitForNode(t *testing.T) { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + "region": "us-east-1", + }, + }, + } + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + + state := &ResourcePolicyPreFilterState{ + nodeSelectos: []labels.Selector{ + labels.SelectorFromSet(labels.Set{"region": "us-east-1"}), + }, + currentCount: []int{0}, + maxCount: []int{1}, + resConsumption: []*framework.Resource{ + {MilliCPU: 500, Memory: 512}, + }, + maxConsumption: []*framework.Resource{ + {MilliCPU: 1000, Memory: 1024}, + }, + podRes: &framework.Resource{ + MilliCPU: 400, Memory: 256, + }, + } + + // Test case 1 + idx, notValid := findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, 0, idx) + assert.Empty(t, notValid) + + // Test case 2: No matching node selector + state.nodeSelectos[0] = labels.SelectorFromSet(labels.Set{"region": "us-west-1"}) + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Empty(t, notValid) + + // Test case 3: Max count reached + state.nodeSelectos[0] = labels.SelectorFromSet(labels.Set{"region": "us-east-1"}) + state.currentCount[0] = 1 + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "pod", notValid[0].res) + + // Test case 4: Resource consumption exceeds limit + state.currentCount[0] = 0 + state.resConsumption[0] = &framework.Resource{ + MilliCPU: 900, Memory: 900, + } + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "cpu", notValid[0].res) + + // Test case 5: memory consumption exceeds limit + state.currentCount[0] = 0 + state.resConsumption[0] = &framework.Resource{ + MilliCPU: 100, Memory: 900, + } + idx, notValid = findAvailableUnitForNode(nodeInfo, state) + assert.Equal(t, -1, idx) + assert.Len(t, notValid, 1) + assert.Equal(t, "memory", notValid[0].res) +} diff --git a/pkg/resourcepolicy/resourcepolicyinfo.go b/pkg/resourcepolicy/resourcepolicyinfo.go index 59285a088..210bfc036 100644 --- a/pkg/resourcepolicy/resourcepolicyinfo.go +++ b/pkg/resourcepolicy/resourcepolicyinfo.go @@ -23,11 +23,10 @@ import ( "github.com/KunWuLuan/resourcepolicyapi/pkg/apis/scheduling/v1alpha1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/sets" listersv1 "k8s.io/client-go/listers/core/v1" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -36,6 +35,7 @@ type resourcePolicyInfo struct { // when processing is ture, means cache of the rspinfo is waiting for reconcile // do not schedule pod in this rsp. processing bool + cond *sync.Cond ks keyStr rv string @@ -61,19 +61,21 @@ type resourcePolicyInfo struct { // // key: pod.Namespace/pod.Name podResourceDetails map[keyStr]*framework.Resource - maxPodResources []framework.Resource + maxPodResources []*framework.Resource assumedPodConsumption []map[labelKeysValue]*framework.Resource } func newResourcePolicyInfo() *resourcePolicyInfo { - return &resourcePolicyInfo{ + rspinfo := &resourcePolicyInfo{ assumedPods: make(multiLevelPodSet, 0), boundPods: make(multiLevelPodSet, 0), assumedPodCount: make([]map[labelKeysValue]int, 0), podResourceDetails: make(map[keyStr]*framework.Resource), - maxPodResources: make([]framework.Resource, 0), + maxPodResources: make([]*framework.Resource, 0), assumedPodConsumption: make([]map[labelKeysValue]*framework.Resource, 0), } + rspinfo.cond = sync.NewCond(&rspinfo.processingLock) + return rspinfo } func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.NodeLister, ss framework.NodeInfoLister, @@ -91,22 +93,21 @@ func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.No // resourcepolicy has been initialized, pods can be scheduled again rspi.processing = false rspi.processingLock.Unlock() + rspi.cond.Broadcast() }() rspi.nodeSelectors = make([]labels.Selector, len(rspi.policy.Spec.Units)) + rspi.assumedPods = make(multiLevelPodSet, len(rspi.policy.Spec.Units)) + rspi.assumedPodConsumption = make([]map[labelKeysValue]*framework.Resource, len(rspi.policy.Spec.Units)) + rspi.maxPodResources = make([]*framework.Resource, len(rspi.policy.Spec.Units)) + rspi.boundPods = make(multiLevelPodSet, len(rspi.policy.Spec.Units)) + rspi.assumedPodCount = make([]map[labelKeysValue]int, len(rspi.policy.Spec.Units)) for idx, unit := range rspi.policy.Spec.Units { - selector := labels.NewSelector() - reqs, _ := labels.SelectorFromSet(unit.NodeSelector.MatchLabels).Requirements() - selector.Add(reqs...) - for _, exp := range unit.NodeSelector.MatchExpressions { - req, err := labels.NewRequirement(exp.Key, selection.Operator(exp.Operator), exp.Values) - if err != nil { - continue - } - selector.Add(*req) - } + selector := labels.SelectorFromSet(unit.NodeSelector) rspi.nodeSelectors[idx] = selector - + if len(unit.MaxResources) > 0 { + rspi.maxPodResources[idx] = framework.NewResource(unit.MaxResources) + } nodes, err := nl.List(selector) if err != nil { continue @@ -125,7 +126,8 @@ func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.No if !ok { continue } - res := resource.PodRequests(po.Pod, resource.PodResourcesOptions{}) + + res := resourcehelper.PodRequests(po.Pod, resourcehelper.PodResourcesOptions{}) rspi.addPodToBoundOrAssumedPods(rspi.boundPods, idx, labelKeyValue, no.Name, GetKeyStr(po.Pod.ObjectMeta), framework.NewResource(res)) } } @@ -135,6 +137,9 @@ func (rspi *resourcePolicyInfo) complete(pl listersv1.PodLister, nl listersv1.No // lock should be get outside // TODO: when pod resource is chanaged, assumedPodConsumption maybe wrong func (r *resourcePolicyInfo) addPodToBoundOrAssumedPods(ps multiLevelPodSet, index int, labelValues labelKeysValue, nodename string, podKey keyStr, res *framework.Resource) bool { + if len(ps[index]) == 0 { + ps[index] = make(map[string]map[labelKeysValue]sets.Set[keyStr]) + } podsetByValue := ps[index][nodename] if podsetByValue == nil { podsetByValue = make(map[labelKeysValue]sets.Set[keyStr]) @@ -176,7 +181,7 @@ func (r *resourcePolicyInfo) removePod(podKeyStr keyStr, labelKeyValue labelKeys podRes := r.podResourceDetails[podKeyStr] if podRes != nil { for idx := range r.assumedPodConsumption { - r.updateAssumedPodResource(idx, labelKeyValue, false, podRes) + r.updateAssumedPodCount(idx, labelKeyValue, -1, podRes) } delete(r.podResourceDetails, podKeyStr) } diff --git a/pkg/resourcepolicy/types.go b/pkg/resourcepolicy/types.go index 3dd787925..30c953c44 100644 --- a/pkg/resourcepolicy/types.go +++ b/pkg/resourcepolicy/types.go @@ -163,22 +163,31 @@ func subResources(res1, res2 *framework.Resource) { } } -func lessThan(res1, res2 *framework.Resource) (bool, string) { - if res1.MilliCPU > 0 && res1.MilliCPU >= res2.MilliCPU { - return false, "cpu" +func largeThan(res1, res2 *framework.Resource) (bool, string) { + if res1 == nil && res2 == nil { + return false, "" + } + if res1 == nil { + return false, "" + } + if res2 == nil { + return true, "" + } + if res1.MilliCPU > 0 && res1.MilliCPU > res2.MilliCPU { + return true, "cpu" } - if res1.Memory > 0 && res1.Memory >= res2.Memory { - return false, "memory" + if res1.Memory > 0 && res1.Memory > res2.Memory { + return true, "memory" } - if res1.EphemeralStorage > 0 && res1.EphemeralStorage >= res2.EphemeralStorage { - return false, "ephemeral-storage" + if res1.EphemeralStorage > 0 && res1.EphemeralStorage > res2.EphemeralStorage { + return true, "ephemeral-storage" } for k, v := range res1.ScalarResources { - if v2, ok := res2.ScalarResources[k]; !ok || ok && v > 0 && v >= v2 { - return false, string(k) + if v2, ok := res2.ScalarResources[k]; !ok || ok && v > 0 && v > v2 { + return true, string(k) } } - return true, "" + return false, "" } func SliceCopyInt(src []int) []int { From 6528fbe3fa07c512faaf6fe43d0d14e28e22ccb1 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Sun, 27 Jul 2025 20:43:18 +0800 Subject: [PATCH 4/4] add kep Signed-off-by: KunWuLuan --- kep/594-resourcepolicy/README.md | 177 +++++++++++++++++++++++++++++++ kep/594-resourcepolicy/kep.yaml | 5 + 2 files changed, 182 insertions(+) create mode 100644 kep/594-resourcepolicy/README.md create mode 100644 kep/594-resourcepolicy/kep.yaml diff --git a/kep/594-resourcepolicy/README.md b/kep/594-resourcepolicy/README.md new file mode 100644 index 000000000..531f0ab94 --- /dev/null +++ b/kep/594-resourcepolicy/README.md @@ -0,0 +1,177 @@ +# Resource Policy + +## Table of Contents + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Use Cases](#use-cases) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [API](#api) + - [Implementation Details](#implementation-details) + - [PreFilter](#prefilter) + - [Filter](#filter) + - [Score](#score) + - [PreBind](#prebind) +- [Known limitations](#known-limitations) +- [Test plans](#test-plans) +- [Graduation criteria](#graduation-criteria) +- [Feature enablement and rollback](#feature-enablement-and-rollback) + + +## Summary +This proposal introduces a plugin that enables users to set priorities for various resources and define maximum resource consumption limits for workloads across different resources. + +## Motivation +A Kubernetes cluster typically consists of heterogeneous machines, with varying SKUs on CPU, memory, GPU, and pricing. To +efficiently utilize the different resources available in the cluster, users can set priorities for machines of different +types and configure resource allocations for different workloads. Additionally, they may choose to delete pods running +on low priority nodes instead of high priority ones. + +### Use Cases + +1. As a administrator of kubernetes cluster, there are some static but expensive VM instances and some dynamic but cheaper Spot +instances in my cluster. I hope to restrict the resource consumption on each kind of resource for different workloads to limit the cost. +I hope that important workloads in my cluster can be deployed first on static VM instances so that they will not worry about been preempted. And during business peak periods, the Pods that are scaled up are deployed on cheap, spot instances. At the end of the business peak, the Pods on Spot +instances are prioritized to be scaled down. + +### Goals + +1. Develop a filter plugin to restrict the resource consumption on each kind of resource for different workloads. +2. Develop a score plugin to favor nodes matched by a high priority kind of resource. +3. Automatically setting deletion costs on Pods to control the scaling in sequence of workloads through a controller. + +### Non-Goals + +1. Scheduler will not delete the pods. + +## Proposal + +### API +```yaml +apiVersion: scheduling.sigs.x-k8s.io/v1alpha1 +kind: ResourcePolicy +metadata: + name: xxx + namespace: xxx +spec: + matchLabelKeys: + - pod-template-hash + podSelector: + key1: value1 + strategy: prefer + units: + - name: unit1 + max: 10 + maxResource: + cpu: 10 + nodeSelector: + key1: value1 +``` + +```go +type ResourcePolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ResourcePolicySpec `json:"spec"` + Status ResourcePolicyStatus `json:"status,omitempty"` +} + +type ResourcePolicySpec struct { + // +optional + // +nullable + // +listType=atomic + Units []Unit `json:"units,omitempty" protobuf:"bytes,1,rep,name=units"` + + Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"` + MatchLabelKeys []string `json:"matchLabelKeys,omitempty" protobuf:"bytes,3,rep,name=matchLabelKeys"` +} + +type Unit struct { + Max *int32 `json:"max,omitempty" protobuf:"varint,1,opt,name=max"` + MaxResources v1.ResourceList `json:"maxResources,omitempty" protobuf:"bytes,2,rep,name=maxResources"` + + NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,3,rep,name=nodeSelector"` + + PodLabelsToAdd map[string]string `json:"podLabels,omitempty" protobuf:"bytes,4,rep,name=podLabels"` + PodAnnotationsToAdd map[string]string `json:"podAnnotations,omitempty" protobuf:"bytes,5,rep,name=podAnnotations"` +} + +type ResourcePolicyStatus struct { + Pods []int64 `json:"pods,omitempty"` + LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` +} +``` + +Pods will be matched by the ResourcePolicy in same namespace when the `.spec.podSelector`. +ResourcePolicies will never match pods in different namesapces. One pod can not be matched by more than one Resource Policies. + +Pods can only be scheduled on units defined in `.spec.units`. Each item in `.spec.units` contains a set of nodes that match the `NodeSelector` which describes a kind of resource in the cluster. + +Pods will be scheduled in the order defined by the `.spec.units`. +`.spec.units[].max` define the maximum number of pods that can be scheduled on each unit. If `.spec.units[].max` is not set, pods can always be scheduled on the units except there is no enough resource. +`.spec.units[].maxResource` define the maximum resource that can be scheduled on each unit. If `.spec.units[].maxResource` is not set, pods can always be scheduled on the units except there is no enough resource. + +`.spec.matchLabelKeys` indicate how we group the pods matched by `podSelector`, its behavior is like +`.spec.matchLabelKeys` in `PodTopologySpread`. + +### Implementation Details + +#### PreFilter +PreFilter check if the current pods match only one resource policy. If not, PreFilter will reject the pod. +If yes, PreFilter will get the number of pods on each unit to determine which units are available for the pod +and write this information into cycleState. + +#### Filter +Filter check if the node belongs to an available unit. If the node doesn't belong to any unit, we will return unschedulable. + +Besides, filter will check if the pods that was scheduled on the unit has already violated the quantity constraint. +If the number of pods has reach the `.spec.unit[].max`, all the nodes in unit will be marked unschedulable. + +#### Score + +Node score is `100 - (index of the unit)` + +#### PreBind + +Add annotations and labels to pods to ensure they can be scaled down in the order of the units. + +## Known limitations + +- Currently deletion costs only take effect on deployment workload. + +## Test plans + +1. Add detailed unit and integration tests for the plugin and controller. +2. Add basic e2e tests, to ensure all components are working together. + +## Graduation criteria + +This plugin will not be enabled only when users enable it in scheduler framework and create a resourcepolicy for pods. +So it is safe to be beta. + +* Beta +- [ ] Add node E2E tests. +- [ ] Provide beta-level documentation. + +## Feature enablement and rollback + +Enable resourcepolicy in MultiPointPlugin to enable this plugin, like this: + +```yaml +piVersion: kubescheduler.config.k8s.io/v1 +kind: KubeSchedulerConfiguration +leaderElection: + leaderElect: false +profiles: +- schedulerName: default-scheduler + plugins: + multiPoint: + enabled: + - name: resourcepolicy +``` + + diff --git a/kep/594-resourcepolicy/kep.yaml b/kep/594-resourcepolicy/kep.yaml new file mode 100644 index 000000000..32b3a3f9a --- /dev/null +++ b/kep/594-resourcepolicy/kep.yaml @@ -0,0 +1,5 @@ +title: Resourcepolicy +kep-number: 594 +authors: + - "@KunWuLuan" + - "@fjding"