diff --git a/go.mod b/go.mod index dcae1a2c3b..caea451f5c 100644 --- a/go.mod +++ b/go.mod @@ -38,18 +38,18 @@ require ( golang.org/x/time v0.13.0 google.golang.org/grpc v1.75.1 gopkg.in/yaml.v2 v2.4.0 - k8s.io/api v0.34.0 - k8s.io/apiextensions-apiserver v0.34.0 - k8s.io/apimachinery v0.34.0 - k8s.io/apiserver v0.34.0 - k8s.io/client-go v0.34.0 - k8s.io/code-generator v0.34.0 - k8s.io/component-base v0.34.0 + k8s.io/api v0.34.1 + k8s.io/apiextensions-apiserver v0.34.1 + k8s.io/apimachinery v0.34.1 + k8s.io/apiserver v0.34.1 + k8s.io/client-go v0.34.1 + k8s.io/code-generator v0.34.1 + k8s.io/component-base v0.34.1 k8s.io/klog v1.0.0 - k8s.io/kube-aggregator v0.34.0 + k8s.io/kube-aggregator v0.34.1 k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 - sigs.k8s.io/controller-runtime v0.22.0 + sigs.k8s.io/controller-runtime v0.22.2 sigs.k8s.io/controller-tools v0.19.0 ) @@ -179,7 +179,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f // indirect k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kms v0.34.0 // indirect + k8s.io/kms v0.34.1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.33.0 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/go.sum b/go.sum index 7ac539e9cc..9a6ff37d38 100644 --- a/go.sum +++ b/go.sum @@ -1098,24 +1098,24 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.24.0/go.mod h1:5Jl90IUrJHUJYEMANRURMiVvJ0g7Ax7r3R1bqO8zx8I= k8s.io/api v0.25.0/go.mod h1:ttceV1GyV1i1rnmvzT3BST08N6nGt+dudGrquzVQWPk= -k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= -k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug= -k8s.io/apiextensions-apiserver v0.34.0 h1:B3hiB32jV7BcyKcMU5fDaDxk882YrJ1KU+ZSkA9Qxoc= -k8s.io/apiextensions-apiserver v0.34.0/go.mod h1:hLI4GxE1BDBy9adJKxUxCEHBGZtGfIg98Q+JmTD7+g0= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= +k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= k8s.io/apimachinery v0.24.0/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM= k8s.io/apimachinery v0.25.0/go.mod h1:qMx9eAk0sZQGsXGu86fab8tZdffHbwUfsvzqKn4mfB0= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/apiserver v0.34.0 h1:Z51fw1iGMqN7uJ1kEaynf2Aec1Y774PqU+FVWCFV3Jg= -k8s.io/apiserver v0.34.0/go.mod h1:52ti5YhxAvewmmpVRqlASvaqxt0gKJxvCeW7ZrwgazQ= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apiserver v0.34.1 h1:U3JBGdgANK3dfFcyknWde1G6X1F4bg7PXuvlqt8lITA= +k8s.io/apiserver v0.34.1/go.mod h1:eOOc9nrVqlBI1AFCvVzsob0OxtPZUCPiUJL45JOTBG0= k8s.io/client-go v0.24.0/go.mod h1:VFPQET+cAFpYxh6Bq6f4xyMY80G6jKKktU6G0m00VDw= -k8s.io/client-go v0.34.0 h1:YoWv5r7bsBfb0Hs2jh8SOvFbKzzxyNo0nSb0zC19KZo= -k8s.io/client-go v0.34.0/go.mod h1:ozgMnEKXkRjeMvBZdV1AijMHLTh3pbACPvK7zFR+QQY= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= k8s.io/code-generator v0.24.0/go.mod h1:dpVhs00hTuTdTY6jvVxvTFCk6gSMrtfRydbhZwHI15w= -k8s.io/code-generator v0.34.0 h1:Ze2i1QsvUprIlX3oHiGv09BFQRLCz+StA8qKwwFzees= -k8s.io/code-generator v0.34.0/go.mod h1:Py2+4w2HXItL8CGhks8uI/wS3Y93wPKO/9mBQUYNua0= -k8s.io/component-base v0.34.0 h1:bS8Ua3zlJzapklsB1dZgjEJuJEeHjj8yTu1gxE2zQX8= -k8s.io/component-base v0.34.0/go.mod h1:RSCqUdvIjjrEm81epPcjQ/DS+49fADvGSCkIP3IC6vg= +k8s.io/code-generator v0.34.1 h1:WpphT26E+j7tEgIUfFr5WfbJrktCGzB3JoJH9149xYc= +k8s.io/code-generator v0.34.1/go.mod h1:DeWjekbDnJWRwpw3s0Jat87c+e0TgkxoR4ar608yqvg= +k8s.io/component-base v0.34.1 h1:v7xFgG+ONhytZNFpIz5/kecwD+sUhVE6HU7qQUiRM4A= +k8s.io/component-base v0.34.1/go.mod h1:mknCpLlTSKHzAQJJnnHVKqjxR7gBeHRv0rPXA7gdtQ0= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo v0.0.0-20211129171323-c02415ce4185/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f h1:SLb+kxmzfA87x4E4brQzB33VBbT2+x7Zq9ROIHmGn9Q= @@ -1128,10 +1128,10 @@ k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kms v0.34.0 h1:u+/rcxQ3Jr7gC9AY5nXuEnBcGEB7ZOIJ9cdLdyHyEjQ= -k8s.io/kms v0.34.0/go.mod h1:s1CFkLG7w9eaTYvctOxosx88fl4spqmixnNpys0JAtM= -k8s.io/kube-aggregator v0.34.0 h1:XE4u+HOYkj0g44sblhTtPv+QyIIK7sJxrIlia0731kE= -k8s.io/kube-aggregator v0.34.0/go.mod h1:GIUqdChXVC448Vp2Wgxf0m6fir7Xt3A2TAZcs2JNG1Y= +k8s.io/kms v0.34.1 h1:iCFOvewDPzWM9fMTfyIPO+4MeuZ0tcZbugxLNSHFG4w= +k8s.io/kms v0.34.1/go.mod h1:s1CFkLG7w9eaTYvctOxosx88fl4spqmixnNpys0JAtM= +k8s.io/kube-aggregator v0.34.1 h1:WNLV0dVNoFKmuyvdWLd92iDSyD/TSTjqwaPj0U9XAEU= +k8s.io/kube-aggregator v0.34.1/go.mod h1:RU8j+5ERfp0h+gIvWtxRPfsa5nK7rboDm8RST8BJfYQ= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= @@ -1148,8 +1148,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.33.0 h1:qPrZsv1cwQiFeieFlRqT627fVZ+tyfou/+S5S0H5ua0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.33.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= -sigs.k8s.io/controller-runtime v0.22.0 h1:mTOfibb8Hxwpx3xEkR56i7xSjB+nH4hZG37SrlCY5e0= -sigs.k8s.io/controller-runtime v0.22.0/go.mod h1:FwiwRjkRPbiN+zp2QRp7wlTCzbUXxZ/D4OzuQUDwBHY= +sigs.k8s.io/controller-runtime v0.22.2 h1:cK2l8BGWsSWkXz09tcS4rJh95iOLney5eawcK5A33r4= +sigs.k8s.io/controller-runtime v0.22.2/go.mod h1:+QX1XUpTXN4mLoblf4tqr5CQcyHPAki2HLXqQMY6vh8= sigs.k8s.io/controller-tools v0.19.0 h1:OU7jrPPiZusryu6YK0jYSjPqg8Vhf8cAzluP9XGI5uk= sigs.k8s.io/controller-tools v0.19.0/go.mod h1:y5HY/iNDFkmFla2CfQoVb2AQXMsBk4ad84iR1PLANB0= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= diff --git a/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/validation/formats.go b/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/validation/formats.go index c11f1c627a..5c0397909b 100644 --- a/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/validation/formats.go +++ b/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/validation/formats.go @@ -113,9 +113,11 @@ func GetUnrecognizedFormats(schema *spec.Schema, compatibilityVersion *version.V return unrecognizedFormats } - normalized := strings.ReplaceAll(schema.Format, "-", "") // go-openapi default format name normalization - if !supportedFormatsAtVersion(compatibilityVersion).supported.Has(normalized) { - unrecognizedFormats = append(unrecognizedFormats, schema.Format) + if len(schema.Type) == 1 && schema.Type[0] == "string" { + normalized := strings.ReplaceAll(schema.Format, "-", "") // go-openapi default format name normalization + if !supportedFormatsAtVersion(compatibilityVersion).supported.Has(normalized) { + unrecognizedFormats = append(unrecognizedFormats, schema.Format) + } } return unrecognizedFormats diff --git a/vendor/k8s.io/apiserver/pkg/storage/etcd3/stats.go b/vendor/k8s.io/apiserver/pkg/storage/etcd3/stats.go index 227c46e10f..9f0b3f78e4 100644 --- a/vendor/k8s.io/apiserver/pkg/storage/etcd3/stats.go +++ b/vendor/k8s.io/apiserver/pkg/storage/etcd3/stats.go @@ -18,6 +18,7 @@ package etcd3 import ( "context" + "errors" "strings" "sync" @@ -78,6 +79,8 @@ type sizeRevision struct { revision int64 } +var errStatsDisabled = errors.New("key size stats disabled") + func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) { keys, err := sc.GetKeys(ctx) if err != nil { @@ -100,6 +103,9 @@ func (sc *statsCache) GetKeys(ctx context.Context) ([]string, error) { getKeys := sc.getKeys sc.getKeysLock.Unlock() + if getKeys == nil { + return nil, errStatsDisabled + } // Don't execute getKeys under lock. return getKeys(ctx) } @@ -133,7 +139,10 @@ func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) { } keys, err := sc.GetKeys(ctx) if err != nil { - klog.InfoS("Error getting keys", "err", err) + if !errors.Is(err, errStatsDisabled) { + klog.InfoS("Error getting keys", "err", err) + } + return } sc.keysLock.Lock() defer sc.keysLock.Unlock() diff --git a/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go b/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go index 6fb9fe9aa5..d8b7a00a2b 100644 --- a/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -186,8 +186,9 @@ func New(c *kubernetes.Client, compactor Compactor, codec runtime.Codec, newFunc newListFunc: newListFunc, compactor: compactor, } - if utilfeature.DefaultFeatureGate.Enabled(features.SizeBasedListCostEstimate) { - stats := newStatsCache(pathPrefix, s.getKeys) + // Collecting stats requires properly set resourcePrefix to call getKeys. + if resourcePrefix != "" && utilfeature.DefaultFeatureGate.Enabled(features.SizeBasedListCostEstimate) { + stats := newStatsCache(pathPrefix, nil) s.stats = stats w.stats = stats } @@ -631,7 +632,10 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje func (s *store) Stats(ctx context.Context) (stats storage.Stats, err error) { if s.stats != nil { - return s.stats.Stats(ctx) + stats, err := s.stats.Stats(ctx) + if !errors.Is(err, errStatsDisabled) { + return stats, err + } } startTime := time.Now() prefix, err := s.prepareKey(s.resourcePrefix) diff --git a/vendor/modules.txt b/vendor/modules.txt index e8db2bfbdc..57772d48b5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ gopkg.in/yaml.v2 # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3 -# k8s.io/api v0.34.0 +# k8s.io/api v0.34.1 ## explicit; go 1.24.0 k8s.io/api/admission/v1 k8s.io/api/admission/v1beta1 @@ -1080,7 +1080,7 @@ k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 k8s.io/api/storagemigration/v1alpha1 -# k8s.io/apiextensions-apiserver v0.34.0 +# k8s.io/apiextensions-apiserver v0.34.1 ## explicit; go 1.24.0 k8s.io/apiextensions-apiserver/pkg/apihelpers k8s.io/apiextensions-apiserver/pkg/apis/apiextensions @@ -1107,7 +1107,7 @@ k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextension k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1 k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1/fake k8s.io/apiextensions-apiserver/pkg/features -# k8s.io/apimachinery v0.34.0 +# k8s.io/apimachinery v0.34.1 ## explicit; go 1.24.0 k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors @@ -1179,7 +1179,7 @@ k8s.io/apimachinery/pkg/version k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/apiserver v0.34.0 +# k8s.io/apiserver v0.34.1 ## explicit; go 1.24.0 k8s.io/apiserver/pkg/admission k8s.io/apiserver/pkg/admission/configuration @@ -1341,7 +1341,7 @@ k8s.io/apiserver/plugin/pkg/audit/webhook k8s.io/apiserver/plugin/pkg/authenticator/token/webhook k8s.io/apiserver/plugin/pkg/authorizer/webhook k8s.io/apiserver/plugin/pkg/authorizer/webhook/metrics -# k8s.io/client-go v0.34.0 +# k8s.io/client-go v0.34.1 ## explicit; go 1.24.0 k8s.io/client-go/applyconfigurations k8s.io/client-go/applyconfigurations/admissionregistration/v1 @@ -1686,7 +1686,7 @@ k8s.io/client-go/util/homedir k8s.io/client-go/util/keyutil k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue -# k8s.io/code-generator v0.34.0 +# k8s.io/code-generator v0.34.1 ## explicit; go 1.24.0 k8s.io/code-generator k8s.io/code-generator/cmd/applyconfiguration-gen @@ -1722,7 +1722,7 @@ k8s.io/code-generator/cmd/register-gen/generators k8s.io/code-generator/pkg/namer k8s.io/code-generator/pkg/util k8s.io/code-generator/third_party/forked/golang/reflect -# k8s.io/component-base v0.34.0 +# k8s.io/component-base v0.34.1 ## explicit; go 1.24.0 k8s.io/component-base/cli/flag k8s.io/component-base/compatibility @@ -1769,13 +1769,13 @@ k8s.io/klog/v2/internal/severity k8s.io/klog/v2/internal/sloghandler k8s.io/klog/v2/internal/verbosity k8s.io/klog/v2/textlogger -# k8s.io/kms v0.34.0 +# k8s.io/kms v0.34.1 ## explicit; go 1.24.0 k8s.io/kms/apis/v1beta1 k8s.io/kms/apis/v2 k8s.io/kms/pkg/service k8s.io/kms/pkg/util -# k8s.io/kube-aggregator v0.34.0 +# k8s.io/kube-aggregator v0.34.1 ## explicit; go 1.24.0 k8s.io/kube-aggregator/pkg/apis/apiregistration k8s.io/kube-aggregator/pkg/apis/apiregistration/v1 @@ -1840,7 +1840,7 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client -# sigs.k8s.io/controller-runtime v0.22.0 +# sigs.k8s.io/controller-runtime v0.22.2 ## explicit; go 1.24.0 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go index 45f9e00e18..f88a44edd2 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go @@ -141,6 +141,7 @@ type ClientBuilder struct { interceptorFuncs *interceptor.Funcs typeConverters []managedfields.TypeConverter returnManagedFields bool + isBuilt bool // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. @@ -267,6 +268,9 @@ func (f *ClientBuilder) WithReturnManagedFields() *ClientBuilder { // Build builds and returns a new fake client. func (f *ClientBuilder) Build() client.WithWatch { + if f.isBuilt { + panic("Build() must not be called multiple times when creating a ClientBuilder") + } if f.scheme == nil { f.scheme = scheme.Scheme } @@ -344,6 +348,7 @@ func (f *ClientBuilder) Build() client.WithWatch { result = interceptor.NewClient(result, *f.interceptorFuncs) } + f.isBuilt = true return result } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go index ec55861080..b99d7663bd 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go @@ -28,7 +28,10 @@ import ( var ( // Apply uses server-side apply to patch the given object. // - // Deprecated: Use client.Client.Apply() instead. + // This should now only be used to patch sub resources, e.g. with client.Client.Status().Patch(). + // Use client.Client.Apply() instead of client.Client.Patch(..., client.Apply, ...) + // This will be deprecated once the Apply method has been added for sub resources. + // See the following issue for more details: https://github.com/kubernetes-sigs/controller-runtime/issues/3183 Apply Patch = applyPatch{} // Merge uses the raw object as a merge patch, without modifications. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go index 49942186c0..98df84c56b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -206,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -213,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // We iterate through the priorities high to low until we find a ready item + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + // Adjusting the pivot item moves the ascend to the next lower priority + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -290,9 +320,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() - item := <-w.get - - return item.Key, item.Priority, w.shutdown.Load() + select { + case <-w.done: + // Return if the queue was shutdown while we were already waiting for an item here. + // For example controller workers are continuously calling GetWithPriority and + // GetWithPriority is blocking the workers if there are no items in the queue. + // If the controller and accordingly the queue is then shut down, without this code + // branch the controller workers remain blocked here and are unable to shut down. + var zero T + return zero, 0, true + case item := <-w.get: + return item.Key, item.Priority, w.shutdown.Load() + } } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -378,6 +417,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -387,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -417,4 +456,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) }