From f4b11d823b4c963fdc0ec782f90dffc22ba72635 Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Fri, 14 Nov 2025 14:32:27 +0800 Subject: [PATCH 1/2] Bump controller-runtime from v0.21.0 to v0.22.4. Signed-off-by: RainbowMango --- go.mod | 2 +- go.sum | 4 +- .../federatedhpa_controller_test.go | 5 + .../endpointslice_dispatch_controller_test.go | 5 + vendor/modules.txt | 3 +- .../controller-runtime/.golangci.yml | 10 + .../controller-runtime/OWNERS_ALIASES | 4 +- .../sigs.k8s.io/controller-runtime/README.md | 6 +- .../controller-runtime/VERSIONING.md | 10 + .../controller-runtime/pkg/builder/webhook.go | 1 + .../controller-runtime/pkg/cache/cache.go | 93 +++- .../pkg/cache/internal/cache_reader.go | 9 +- .../pkg/cache/internal/informers.go | 2 +- .../pkg/certwatcher/certwatcher.go | 25 +- .../pkg/client/apiutil/apimachinery.go | 20 +- .../pkg/client/applyconfigurations.go | 75 +++ .../controller-runtime/pkg/client/client.go | 13 +- .../pkg/client/client_rest_resources.go | 105 +++- .../controller-runtime/pkg/client/dryrun.go | 4 + .../pkg/client/fake/client.go | 475 +++++++++++++++--- .../pkg/client/fake/typeconverter.go | 60 +++ .../pkg/client/fieldowner.go | 4 + .../pkg/client/fieldvalidation.go | 4 + .../pkg/client/interceptor/intercept.go | 9 + .../pkg/client/interfaces.go | 3 + .../pkg/client/namespaced_client.go | 56 ++- .../controller-runtime/pkg/client/options.go | 127 ++++- .../controller-runtime/pkg/client/patch.go | 5 + .../pkg/client/typed_client.go | 57 ++- .../pkg/client/unstructured_client.go | 57 ++- .../pkg/config/controller.go | 23 +- .../pkg/controller/controller.go | 37 +- .../controller/priorityqueue/priorityqueue.go | 139 +++-- .../pkg/handler/enqueue_mapped.go | 3 +- .../pkg/handler/eventhandler.go | 70 +-- .../pkg/internal/controller/controller.go | 275 +++++++--- .../pkg/leaderelection/leader_election.go | 32 +- .../pkg/manager/internal.go | 17 + .../controller-runtime/pkg/manager/manager.go | 21 +- .../pkg/manager/runnable_group.go | 60 ++- .../controller-runtime/pkg/manager/server.go | 2 +- .../pkg/predicate/predicate.go | 25 +- .../pkg/webhook/conversion/conversion.go | 21 +- .../pkg/webhook/conversion/metrics/metrics.go | 39 ++ 44 files changed, 1638 insertions(+), 379 deletions(-) create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/client/applyconfigurations.go create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/typeconverter.go create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics/metrics.go diff --git a/go.mod b/go.mod index c453477f5f9a..0c10fdb8c1b4 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 layeh.com/gopher-json v0.0.0-20201124131017-552bb3c4c3bf sigs.k8s.io/cluster-api v1.7.1 - sigs.k8s.io/controller-runtime v0.21.0 + sigs.k8s.io/controller-runtime v0.22.4 sigs.k8s.io/custom-metrics-apiserver v1.33.0 sigs.k8s.io/kind v0.29.0 sigs.k8s.io/mcs-api v0.1.0 diff --git a/go.sum b/go.sum index 80e5bf29e29b..177c02c74b48 100644 --- a/go.sum +++ b/go.sum @@ -915,8 +915,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1 sigs.k8s.io/cluster-api v1.7.1 h1:JkMAbAMzBM+WBHxXLTJXTiCisv1PAaHRzld/3qrmLYY= sigs.k8s.io/cluster-api v1.7.1/go.mod h1:V9ZhKLvQtsDODwjXOKgbitjyCmC71yMBwDcMyNNIov0= sigs.k8s.io/controller-runtime v0.6.1/go.mod h1:XRYBPdbf5XJu9kpS84VJiZ7h/u1hF3gEORz0efEja7A= -sigs.k8s.io/controller-runtime v0.21.0 h1:CYfjpEuicjUecRk+KAeyYh+ouUBn4llGyDYytIGcJS8= -sigs.k8s.io/controller-runtime v0.21.0/go.mod h1:OSg14+F65eWqIu4DceX7k/+QRAbTTvxeQSNSOQpukWM= +sigs.k8s.io/controller-runtime v0.22.4 h1:GEjV7KV3TY8e+tJ2LCTxUTanW4z/FmNB7l327UfMq9A= +sigs.k8s.io/controller-runtime v0.22.4/go.mod h1:+QX1XUpTXN4mLoblf4tqr5CQcyHPAki2HLXqQMY6vh8= sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI= sigs.k8s.io/custom-metrics-apiserver v1.33.0 h1:9uHgBT8ah8PZG+iMAhkXH25kIOmWs7JcrasFA0d83rI= sigs.k8s.io/custom-metrics-apiserver v1.33.0/go.mod h1:7lxzUW4z5aEaXOtqOdzh8+pzJ/gT/E3B6LooxIP3tKc= diff --git a/pkg/controllers/federatedhpa/federatedhpa_controller_test.go b/pkg/controllers/federatedhpa/federatedhpa_controller_test.go index f23c8818146b..601a215f5d3d 100644 --- a/pkg/controllers/federatedhpa/federatedhpa_controller_test.go +++ b/pkg/controllers/federatedhpa/federatedhpa_controller_test.go @@ -58,6 +58,11 @@ func (m *MockClient) List(ctx context.Context, list client.ObjectList, opts ...c return args.Error(0) } +func (m *MockClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + func (m *MockClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { args := m.Called(ctx, obj, opts) return args.Error(0) diff --git a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller_test.go b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller_test.go index 6a97113fe5b8..080bb3a38dfd 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller_test.go +++ b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller_test.go @@ -818,6 +818,11 @@ func (m *MockClient) List(ctx context.Context, list client.ObjectList, opts ...c return args.Error(0) } +func (m *MockClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + func (m *MockClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { args := m.Called(ctx, obj, opts) return args.Error(0) diff --git a/vendor/modules.txt b/vendor/modules.txt index 7df1d06dcf66..ce0928bc80d7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1811,7 +1811,7 @@ sigs.k8s.io/cluster-api/errors sigs.k8s.io/cluster-api/feature sigs.k8s.io/cluster-api/util/certs sigs.k8s.io/cluster-api/util/secret -# sigs.k8s.io/controller-runtime v0.21.0 +# sigs.k8s.io/controller-runtime v0.22.4 ## explicit; go 1.24.0 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder @@ -1859,6 +1859,7 @@ sigs.k8s.io/controller-runtime/pkg/webhook sigs.k8s.io/controller-runtime/pkg/webhook/admission sigs.k8s.io/controller-runtime/pkg/webhook/admission/metrics sigs.k8s.io/controller-runtime/pkg/webhook/conversion +sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics # sigs.k8s.io/custom-metrics-apiserver v1.33.0 ## explicit; go 1.24.0 diff --git a/vendor/sigs.k8s.io/controller-runtime/.golangci.yml b/vendor/sigs.k8s.io/controller-runtime/.golangci.yml index 7390d2024b13..1741432a013d 100644 --- a/vendor/sigs.k8s.io/controller-runtime/.golangci.yml +++ b/vendor/sigs.k8s.io/controller-runtime/.golangci.yml @@ -17,6 +17,7 @@ linters: - errchkjson - errorlint - exhaustive + - forbidigo - ginkgolinter - goconst - gocritic @@ -39,6 +40,12 @@ linters: - unused - whitespace settings: + forbidigo: + forbid: + - pattern: context.Background + msg: Use ginkgos SpecContext or go testings t.Context instead + - pattern: context.TODO + msg: Use ginkgos SpecContext or go testings t.Context instead govet: disable: - fieldalignment @@ -94,6 +101,9 @@ linters: - zz_generated.*\.go$ - .*conversion.*\.go$ rules: + - linters: + - forbidigo + path-except: _test\.go - linters: - gosec text: 'G108: Profiling endpoint is automatically exposed on /debug/pprof' diff --git a/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES b/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES index 5f5b2b66d572..47bf6eedf3c5 100644 --- a/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES +++ b/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES @@ -4,8 +4,10 @@ aliases: # active folks who can be contacted to perform admin-related # tasks on the repo, or otherwise approve any PRS. controller-runtime-admins: - - vincepri + - alvaroaleman - joelanford + - sbueringer + - vincepri # non-admin folks who have write-access and can approve any PRs in the repo controller-runtime-maintainers: diff --git a/vendor/sigs.k8s.io/controller-runtime/README.md b/vendor/sigs.k8s.io/controller-runtime/README.md index 20f7fd817b81..54bacad42ec5 100644 --- a/vendor/sigs.k8s.io/controller-runtime/README.md +++ b/vendor/sigs.k8s.io/controller-runtime/README.md @@ -25,9 +25,9 @@ The full documentation can be found at [VERSIONING.md](VERSIONING.md), but TL;DR Users: -- We follow [Semantic Versioning (semver)](https://semver.org) -- Use releases with your dependency management to ensure that you get compatible code -- The main branch contains all the latest code, some of which may break compatibility (so "normal" `go get` is not recommended) +- We stick to a zero major version +- We publish a minor version for each Kubernetes minor release and allow breaking changes between minor versions +- We publish patch versions as needed and we don't allow breaking changes in them Contributors: diff --git a/vendor/sigs.k8s.io/controller-runtime/VERSIONING.md b/vendor/sigs.k8s.io/controller-runtime/VERSIONING.md index 2c0f2f9b2d47..7ad6b142cc47 100644 --- a/vendor/sigs.k8s.io/controller-runtime/VERSIONING.md +++ b/vendor/sigs.k8s.io/controller-runtime/VERSIONING.md @@ -7,6 +7,16 @@ For the purposes of the aforementioned guidelines, controller-runtime counts as a "library project", but otherwise follows the guidelines exactly. +We stick to a major version of zero and create a minor version for +each Kubernetes minor version and we allow breaking changes in our +minor versions. We create patch releases as needed and don't allow +breaking changes in them. + +Publishing a non-zero major version is pointless for us, as the k8s.io/* +libraries we heavily depend on do breaking changes but use the same +versioning scheme as described above. Consequently, a project can only +ever depend on one controller-runtime version. + [guidelines]: https://sigs.k8s.io/kubebuilder-release-tools/VERSIONING.md ## Compatibility and Release Support diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/builder/webhook.go b/vendor/sigs.k8s.io/controller-runtime/pkg/builder/webhook.go index 8ec6d58fda92..6263f030a03c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/builder/webhook.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/builder/webhook.go @@ -98,6 +98,7 @@ func (blder *WebhookBuilder) RecoverPanic(recoverPanic bool) *WebhookBuilder { } // WithCustomPath overrides the webhook's default path by the customPath +// // Deprecated: WithCustomPath should not be used anymore. // Please use WithValidatorCustomPath or WithDefaulterCustomPath instead. func (blder *WebhookBuilder) WithCustomPath(customPath string) *WebhookBuilder { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go index 648d0d75b407..a94ec6cc3210 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go @@ -172,6 +172,15 @@ type Options struct { // is "done" with an object, and would otherwise not requeue it, i.e., we // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, // instead of `reconcile.Result{}`. + // + // SyncPeriod will locally trigger an artificial Update event with the same + // object in both ObjectOld and ObjectNew for everything that is in the + // cache. + // + // Predicates or Handlers that expect ObjectOld and ObjectNew to be different + // (such as GenerationChangedPredicate) will filter out this event, preventing + // it from triggering a reconciliation. + // SyncPeriod does not sync between the local cache and the server. SyncPeriod *time.Duration // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user @@ -299,6 +308,42 @@ type ByObject struct { // // Defaults to true. EnableWatchBookmarks *bool + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + // + // This applies to all controllers. + // + // A period sync happens for two reasons: + // 1. To insure against a bug in the controller that causes an object to not + // be requeued, when it otherwise should be requeued. + // 2. To insure against an unknown bug in controller-runtime, or its dependencies, + // that causes an object to not be requeued, when it otherwise should be + // requeued, or to be removed from the queue, when it otherwise should not + // be removed. + // + // If you want + // 1. to insure against missed watch events, or + // 2. to poll services that cannot be watched, + // then we recommend that, instead of changing the default period, the + // controller requeue, with a constant duration `t`, whenever the controller + // is "done" with an object, and would otherwise not requeue it, i.e., we + // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, + // instead of `reconcile.Result{}`. + // + // SyncPeriod will locally trigger an artificial Update event with the same + // object in both ObjectOld and ObjectNew for everything that is in the + // cache. + // + // Predicates or Handlers that expect ObjectOld and ObjectNew to be different + // (such as GenerationChangedPredicate) will filter out this event, preventing + // it from triggering a reconciliation. + // SyncPeriod does not sync between the local cache and the server. + SyncPeriod *time.Duration } // Config describes all potential options for a given watch. @@ -334,6 +379,42 @@ type Config struct { // // Defaults to true. EnableWatchBookmarks *bool + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + // + // This applies to all controllers. + // + // A period sync happens for two reasons: + // 1. To insure against a bug in the controller that causes an object to not + // be requeued, when it otherwise should be requeued. + // 2. To insure against an unknown bug in controller-runtime, or its dependencies, + // that causes an object to not be requeued, when it otherwise should be + // requeued, or to be removed from the queue, when it otherwise should not + // be removed. + // + // If you want + // 1. to insure against missed watch events, or + // 2. to poll services that cannot be watched, + // then we recommend that, instead of changing the default period, the + // controller requeue, with a constant duration `t`, whenever the controller + // is "done" with an object, and would otherwise not requeue it, i.e., we + // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, + // instead of `reconcile.Result{}`. + // + // SyncPeriod will locally trigger an artificial Update event with the same + // object in both ObjectOld and ObjectNew for everything that is in the + // cache. + // + // Predicates or Handlers that expect ObjectOld and ObjectNew to be different + // (such as GenerationChangedPredicate) will filter out this event, preventing + // it from triggering a reconciliation. + // SyncPeriod does not sync between the local cache and the server. + SyncPeriod *time.Duration } // NewCacheFunc - Function for creating a new cache from the options and a rest config. @@ -404,6 +485,7 @@ func optionDefaultsToConfig(opts *Options) Config { Transform: opts.DefaultTransform, UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy, EnableWatchBookmarks: opts.DefaultEnableWatchBookmarks, + SyncPeriod: opts.SyncPeriod, } } @@ -414,6 +496,7 @@ func byObjectToConfig(byObject ByObject) Config { Transform: byObject.Transform, UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy, EnableWatchBookmarks: byObject.EnableWatchBookmarks, + SyncPeriod: byObject.SyncPeriod, } } @@ -427,7 +510,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { HTTPClient: opts.HTTPClient, Scheme: opts.Scheme, Mapper: opts.Mapper, - ResyncPeriod: *opts.SyncPeriod, + ResyncPeriod: ptr.Deref(config.SyncPeriod, defaultSyncPeriod), Namespace: namespace, Selector: internal.Selector{ Label: config.LabelSelector, @@ -525,6 +608,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { byObject.Transform = defaultedConfig.Transform byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy byObject.EnableWatchBookmarks = defaultedConfig.EnableWatchBookmarks + byObject.SyncPeriod = defaultedConfig.SyncPeriod } opts.ByObject[obj] = byObject @@ -546,10 +630,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { opts.DefaultNamespaces[namespace] = cfg } - // Default the resync period to 10 hours if unset - if opts.SyncPeriod == nil { - opts.SyncPeriod = &defaultSyncPeriod - } return opts, nil } @@ -569,6 +649,9 @@ func defaultConfig(toDefault, defaultFrom Config) Config { if toDefault.EnableWatchBookmarks == nil { toDefault.EnableWatchBookmarks = defaultFrom.EnableWatchBookmarks } + if toDefault.SyncPeriod == nil { + toDefault.SyncPeriod = defaultFrom.SyncPeriod + } return toDefault } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go index 33ce8a830aa4..eb6b5448559f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go @@ -54,7 +54,10 @@ type CacheReader struct { } // Get checks the indexer for the object and writes a copy of it if found. -func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error { +func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error { + getOpts := client.GetOptions{} + getOpts.ApplyOptions(opts) + if c.scopeName == apimeta.RESTScopeNameRoot { key.Namespace = "" } @@ -81,7 +84,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob return fmt.Errorf("cache contained %T, which is not an Object", obj) } - if c.disableDeepCopy { + if c.disableDeepCopy || (getOpts.UnsafeDisableDeepCopy != nil && *getOpts.UnsafeDisableDeepCopy) { // skip deep copy which might be unsafe // you must DeepCopy any object before mutating it outside } else { @@ -97,7 +100,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type()) } reflect.Indirect(outVal).Set(reflect.Indirect(objVal)) - if !c.disableDeepCopy { + if !c.disableDeepCopy && (getOpts.UnsafeDisableDeepCopy == nil || !*getOpts.UnsafeDisableDeepCopy) { out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go index 4bf832b2d9d8..f216be0d9e53 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go @@ -518,7 +518,7 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob // Structured. // default: - client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs, ip.httpClient) + client, err := apiutil.RESTClientForGVK(gvk, false, false, ip.config, ip.codecs, ip.httpClient) if err != nil { return nil, err } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/certwatcher.go b/vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/certwatcher.go index c32324098234..2362d020b839 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/certwatcher.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/certwatcher.go @@ -26,6 +26,7 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/go-logr/logr" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -47,6 +48,7 @@ type CertWatcher struct { currentCert *tls.Certificate watcher *fsnotify.Watcher interval time.Duration + log logr.Logger certPath string keyPath string @@ -65,6 +67,7 @@ func New(certPath, keyPath string) (*CertWatcher, error) { certPath: certPath, keyPath: keyPath, interval: defaultWatchInterval, + log: log.WithValues("cert", certPath, "key", keyPath), } // Initial read of certificate and key. @@ -130,14 +133,14 @@ func (cw *CertWatcher) Start(ctx context.Context) error { ticker := time.NewTicker(cw.interval) defer ticker.Stop() - log.Info("Starting certificate poll+watcher", "interval", cw.interval) + cw.log.Info("Starting certificate poll+watcher", "interval", cw.interval) for { select { case <-ctx.Done(): return cw.watcher.Close() case <-ticker.C: if err := cw.ReadCertificate(); err != nil { - log.Error(err, "failed read certificate") + cw.log.Error(err, "failed read certificate") } } } @@ -160,7 +163,7 @@ func (cw *CertWatcher) Watch() { return } - log.Error(err, "certificate watch error") + cw.log.Error(err, "certificate watch error") } } } @@ -174,7 +177,7 @@ func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBloc if cw.currentCert != nil && bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) && bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) { - log.V(7).Info("certificate already cached") + cw.log.V(7).Info("certificate already cached") return false } cw.currentCert = cert @@ -208,7 +211,7 @@ func (cw *CertWatcher) ReadCertificate() error { return nil } - log.Info("Updated current TLS certificate") + cw.log.Info("Updated current TLS certificate") // If a callback is registered, invoke it with the new certificate. cw.RLock() @@ -229,14 +232,20 @@ func (cw *CertWatcher) handleEvent(event fsnotify.Event) { case event.Op.Has(fsnotify.Chmod), event.Op.Has(fsnotify.Remove): // If the file was removed or renamed, re-add the watch to the previous name if err := cw.watcher.Add(event.Name); err != nil { - log.Error(err, "error re-watching file") + cw.log.Error(err, "error re-watching file") } default: return } - log.V(1).Info("certificate event", "event", event) + cw.log.V(1).Info("certificate event", "event", event) if err := cw.ReadCertificate(); err != nil { - log.Error(err, "error re-reading certificate") + cw.log.Error(err, "error re-reading certificate") } } + +// NeedLeaderElection indicates that the cert-manager +// does not need leader election. +func (cw *CertWatcher) NeedLeaderElection() bool { + return false +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go index 1d4ce264c9bc..b132cb2d4d67 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go @@ -161,15 +161,27 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi // RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated // with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from // baseConfig, if set, otherwise a default serializer will be set. -func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory, httpClient *http.Client) (rest.Interface, error) { +func RESTClientForGVK( + gvk schema.GroupVersionKind, + forceDisableProtoBuf bool, + isUnstructured bool, + baseConfig *rest.Config, + codecs serializer.CodecFactory, + httpClient *http.Client, +) (rest.Interface, error) { if httpClient == nil { return nil, fmt.Errorf("httpClient must not be nil, consider using rest.HTTPClientFor(c) to create a client") } - return rest.RESTClientForConfigAndClient(createRestConfig(gvk, isUnstructured, baseConfig, codecs), httpClient) + return rest.RESTClientForConfigAndClient(createRestConfig(gvk, forceDisableProtoBuf, isUnstructured, baseConfig, codecs), httpClient) } // createRestConfig copies the base config and updates needed fields for a new rest config. -func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) *rest.Config { +func createRestConfig(gvk schema.GroupVersionKind, + forceDisableProtoBuf bool, + isUnstructured bool, + baseConfig *rest.Config, + codecs serializer.CodecFactory, +) *rest.Config { gv := gvk.GroupVersion() cfg := rest.CopyConfig(baseConfig) @@ -183,7 +195,7 @@ func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConf cfg.UserAgent = rest.DefaultKubernetesUserAgent() } // TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true. - if cfg.ContentType == "" && !isUnstructured { + if cfg.ContentType == "" && !forceDisableProtoBuf { protobufSchemeLock.RLock() if protobufScheme.Recognizes(gvk) { cfg.ContentType = runtime.ContentTypeProtobuf diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/applyconfigurations.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/applyconfigurations.go new file mode 100644 index 000000000000..97192050f9d3 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/applyconfigurations.go @@ -0,0 +1,75 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" +) + +type unstructuredApplyConfiguration struct { + *unstructured.Unstructured +} + +func (u *unstructuredApplyConfiguration) IsApplyConfiguration() {} + +// ApplyConfigurationFromUnstructured creates a runtime.ApplyConfiguration from an *unstructured.Unstructured object. +// +// Do not use Unstructured objects here that were generated from API objects, as its impossible to tell +// if a zero value was explicitly set. +func ApplyConfigurationFromUnstructured(u *unstructured.Unstructured) runtime.ApplyConfiguration { + return &unstructuredApplyConfiguration{Unstructured: u} +} + +type applyconfigurationRuntimeObject struct { + runtime.ApplyConfiguration +} + +func (a *applyconfigurationRuntimeObject) GetObjectKind() schema.ObjectKind { + return a +} + +func (a *applyconfigurationRuntimeObject) GroupVersionKind() schema.GroupVersionKind { + return schema.GroupVersionKind{} +} + +func (a *applyconfigurationRuntimeObject) SetGroupVersionKind(gvk schema.GroupVersionKind) {} + +func (a *applyconfigurationRuntimeObject) DeepCopyObject() runtime.Object { + panic("applyconfigurationRuntimeObject does not support DeepCopyObject") +} + +func runtimeObjectFromApplyConfiguration(ac runtime.ApplyConfiguration) runtime.Object { + return &applyconfigurationRuntimeObject{ApplyConfiguration: ac} +} + +func gvkFromApplyConfiguration(ac applyConfiguration) (schema.GroupVersionKind, error) { + var gvk schema.GroupVersionKind + gv, err := schema.ParseGroupVersion(ptr.Deref(ac.GetAPIVersion(), "")) + if err != nil { + return gvk, fmt.Errorf("failed to parse %q as GroupVersion: %w", ptr.Deref(ac.GetAPIVersion(), ""), err) + } + gvk.Group = gv.Group + gvk.Version = gv.Version + gvk.Kind = ptr.Deref(ac.GetKind(), "") + + return gvk, nil +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go index 50b0ebf33867..e9f731453b4f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go @@ -151,8 +151,7 @@ func newClient(config *rest.Config, options Options) (*client, error) { mapper: options.Mapper, codecs: serializer.NewCodecFactory(options.Scheme), - structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), - unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), + resourceByType: make(map[cacheKey]*resourceMeta), } rawMetaClient, err := metadata.NewForConfigAndClient(metadata.ConfigFor(config), options.HTTPClient) @@ -329,6 +328,16 @@ func (c *client) Patch(ctx context.Context, obj Object, patch Patch, opts ...Pat } } +func (c *client) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + switch obj := obj.(type) { + case *unstructuredApplyConfiguration: + defer c.resetGroupVersionKind(obj, obj.GetObjectKind().GroupVersionKind()) + return c.unstructuredClient.Apply(ctx, obj, opts...) + default: + return c.typedClient.Apply(ctx, obj, opts...) + } +} + // Get implements client.Client. func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { if isUncached, err := c.shouldBypassCache(obj); err != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client_rest_resources.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client_rest_resources.go index 2d07879520b4..d75d685cbbb7 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client_rest_resources.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client_rest_resources.go @@ -17,16 +17,17 @@ limitations under the License. package client import ( + "fmt" "net/http" "strings" "sync" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/rest" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) @@ -47,22 +48,30 @@ type clientRestResources struct { // codecs are used to create a REST client for a gvk codecs serializer.CodecFactory - // structuredResourceByType stores structured type metadata - structuredResourceByType map[schema.GroupVersionKind]*resourceMeta - // unstructuredResourceByType stores unstructured type metadata - unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta - mu sync.RWMutex + // resourceByType stores type metadata + resourceByType map[cacheKey]*resourceMeta + + mu sync.RWMutex +} + +type cacheKey struct { + gvk schema.GroupVersionKind + forceDisableProtoBuf bool } // newResource maps obj to a Kubernetes Resource and constructs a client for that Resource. // If the object is a list, the resource represents the item's type instead. -func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool) (*resourceMeta, error) { +func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, + isList bool, + forceDisableProtoBuf bool, + isUnstructured bool, +) (*resourceMeta, error) { if strings.HasSuffix(gvk.Kind, "List") && isList { // if this was a list, treat it as a request for the item's resource gvk.Kind = gvk.Kind[:len(gvk.Kind)-4] } - client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs, c.httpClient) + client, err := apiutil.RESTClientForGVK(gvk, forceDisableProtoBuf, isUnstructured, c.config, c.codecs, c.httpClient) if err != nil { return nil, err } @@ -73,52 +82,96 @@ func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, i return &resourceMeta{Interface: client, mapping: mapping, gvk: gvk}, nil } +type applyConfiguration interface { + GetName() *string + GetNamespace() *string + GetKind() *string + GetAPIVersion() *string +} + // getResource returns the resource meta information for the given type of object. // If the object is a list, the resource represents the item's type instead. -func (c *clientRestResources) getResource(obj runtime.Object) (*resourceMeta, error) { - gvk, err := apiutil.GVKForObject(obj, c.scheme) - if err != nil { - return nil, err +func (c *clientRestResources) getResource(obj any) (*resourceMeta, error) { + var gvk schema.GroupVersionKind + var err error + var isApplyConfiguration bool + switch o := obj.(type) { + case runtime.Object: + gvk, err = apiutil.GVKForObject(o, c.scheme) + if err != nil { + return nil, err + } + case runtime.ApplyConfiguration: + ac, ok := o.(applyConfiguration) + if !ok { + return nil, fmt.Errorf("%T is a runtime.ApplyConfiguration but not an applyConfiguration", o) + } + gvk, err = gvkFromApplyConfiguration(ac) + if err != nil { + return nil, err + } + isApplyConfiguration = true + default: + return nil, fmt.Errorf("bug: %T is neither a runtime.Object nor a runtime.ApplyConfiguration", o) } _, isUnstructured := obj.(runtime.Unstructured) + forceDisableProtoBuf := isUnstructured || isApplyConfiguration // It's better to do creation work twice than to not let multiple // people make requests at once c.mu.RLock() - resourceByType := c.structuredResourceByType - if isUnstructured { - resourceByType = c.unstructuredResourceByType - } - r, known := resourceByType[gvk] + + cacheKey := cacheKey{gvk: gvk, forceDisableProtoBuf: forceDisableProtoBuf} + + r, known := c.resourceByType[cacheKey] + c.mu.RUnlock() if known { return r, nil } + var isList bool + if runtimeObject, ok := obj.(runtime.Object); ok && meta.IsListType(runtimeObject) { + isList = true + } + // Initialize a new Client c.mu.Lock() defer c.mu.Unlock() - r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured) + r, err = c.newResource(gvk, isList, forceDisableProtoBuf, isUnstructured) if err != nil { return nil, err } - resourceByType[gvk] = r + c.resourceByType[cacheKey] = r return r, err } // getObjMeta returns objMeta containing both type and object metadata and state. -func (c *clientRestResources) getObjMeta(obj runtime.Object) (*objMeta, error) { +func (c *clientRestResources) getObjMeta(obj any) (*objMeta, error) { r, err := c.getResource(obj) if err != nil { return nil, err } - m, err := meta.Accessor(obj) - if err != nil { - return nil, err + objMeta := &objMeta{resourceMeta: r} + + switch o := obj.(type) { + case runtime.Object: + m, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + objMeta.namespace = m.GetNamespace() + objMeta.name = m.GetName() + case applyConfiguration: + objMeta.namespace = ptr.Deref(o.GetNamespace(), "") + objMeta.name = ptr.Deref(o.GetName(), "") + default: + return nil, fmt.Errorf("object %T is neither a runtime.Object nor a runtime.ApplyConfiguration", obj) } - return &objMeta{resourceMeta: r, Object: m}, err + + return objMeta, nil } // resourceMeta stores state for a Kubernetes type. @@ -146,6 +199,6 @@ type objMeta struct { // resourceMeta contains type information for the object *resourceMeta - // Object contains meta data for the object instance - metav1.Object + namespace string + name string } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/dryrun.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/dryrun.go index bbcdd3832153..a185860d33da 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/dryrun.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/dryrun.go @@ -82,6 +82,10 @@ func (c *dryRunClient) Patch(ctx context.Context, obj Object, patch Patch, opts return c.client.Patch(ctx, obj, patch, append(opts, DryRunAll)...) } +func (c *dryRunClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + return c.client.Apply(ctx, obj, append(opts, DryRunAll)...) +} + // Get implements client.Client. func (c *dryRunClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { return c.client.Get(ctx, key, obj, opts...) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go index 16e2cba51206..f88a44edd2c4 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go @@ -41,6 +41,7 @@ import ( https://github.com/kubernetes/kubernetes/pull/120326 (v5.6.0+incompatible missing a critical fix) */ + jsonpatch "gopkg.in/evanphx/json-patch.v4" appsv1 "k8s.io/api/apps/v1" authenticationv1 "k8s.io/api/authentication/v1" @@ -52,17 +53,21 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/managedfields" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" + clientgoapplyconfigurations "k8s.io/client-go/applyconfigurations" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/testing" "k8s.io/utils/ptr" @@ -76,8 +81,9 @@ import ( type versionedTracker struct { testing.ObjectTracker - scheme *runtime.Scheme - withStatusSubresource sets.Set[schema.GroupVersionKind] + scheme *runtime.Scheme + withStatusSubresource sets.Set[schema.GroupVersionKind] + usesFieldManagedObjectTracker bool } type fakeClient struct { @@ -98,6 +104,8 @@ type fakeClient struct { indexes map[schema.GroupVersionKind]map[string]client.IndexerFunc // indexesLock must be held when accessing indexes. indexesLock sync.RWMutex + + returnManagedFields bool } var _ client.WithWatch = &fakeClient{} @@ -131,6 +139,9 @@ type ClientBuilder struct { withStatusSubresource []client.Object objectTracker testing.ObjectTracker interceptorFuncs *interceptor.Funcs + typeConverters []managedfields.TypeConverter + returnManagedFields bool + isBuilt bool // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. @@ -172,6 +183,8 @@ func (f *ClientBuilder) WithRuntimeObjects(initRuntimeObjs ...runtime.Object) *C } // WithObjectTracker can be optionally used to initialize this fake client with testing.ObjectTracker. +// Setting this is incompatible with setting WithTypeConverters, as they are a setting on the +// tracker. func (f *ClientBuilder) WithObjectTracker(ot testing.ObjectTracker) *ClientBuilder { f.objectTracker = ot return f @@ -228,8 +241,36 @@ func (f *ClientBuilder) WithInterceptorFuncs(interceptorFuncs interceptor.Funcs) return f } +// WithTypeConverters sets the type converters for the fake client. The list is ordered and the first +// non-erroring converter is used. A type converter must be provided for all types the client is used +// for, otherwise it will error. +// +// This setting is incompatible with WithObjectTracker, as the type converters are a setting on the tracker. +// +// If unset, this defaults to: +// * clientgoapplyconfigurations.NewTypeConverter(scheme.Scheme), +// * managedfields.NewDeducedTypeConverter(), +// +// Be aware that the behavior of the `NewDeducedTypeConverter` might not match the behavior of the +// Kubernetes APIServer, it is recommended to provide a type converter for your types. TypeConverters +// are generated along with ApplyConfigurations. +func (f *ClientBuilder) WithTypeConverters(typeConverters ...managedfields.TypeConverter) *ClientBuilder { + f.typeConverters = append(f.typeConverters, typeConverters...) + return f +} + +// WithReturnManagedFields configures the fake client to return managedFields +// on objects. +func (f *ClientBuilder) WithReturnManagedFields() *ClientBuilder { + f.returnManagedFields = true + return f +} + // Build builds and returns a new fake client. func (f *ClientBuilder) Build() client.WithWatch { + if f.isBuilt { + panic("Build() must not be called multiple times when creating a ClientBuilder") + } if f.scheme == nil { f.scheme = scheme.Scheme } @@ -237,8 +278,6 @@ func (f *ClientBuilder) Build() client.WithWatch { f.restMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{}) } - var tracker versionedTracker - withStatusSubResource := sets.New(inTreeResourcesWithStatus()...) for _, o := range f.withStatusSubresource { gvk, err := apiutil.GVKForObject(o, f.scheme) @@ -248,10 +287,36 @@ func (f *ClientBuilder) Build() client.WithWatch { withStatusSubResource.Insert(gvk) } + if f.objectTracker != nil && len(f.typeConverters) > 0 { + panic(errors.New("WithObjectTracker and WithTypeConverters are incompatible")) + } + + var usesFieldManagedObjectTracker bool if f.objectTracker == nil { - tracker = versionedTracker{ObjectTracker: testing.NewObjectTracker(f.scheme, scheme.Codecs.UniversalDecoder()), scheme: f.scheme, withStatusSubresource: withStatusSubResource} - } else { - tracker = versionedTracker{ObjectTracker: f.objectTracker, scheme: f.scheme, withStatusSubresource: withStatusSubResource} + if len(f.typeConverters) == 0 { + // Use corresponding scheme to ensure the converter error + // for types it can't handle. + clientGoScheme := runtime.NewScheme() + if err := scheme.AddToScheme(clientGoScheme); err != nil { + panic(fmt.Sprintf("failed to construct client-go scheme: %v", err)) + } + f.typeConverters = []managedfields.TypeConverter{ + clientgoapplyconfigurations.NewTypeConverter(clientGoScheme), + managedfields.NewDeducedTypeConverter(), + } + } + f.objectTracker = testing.NewFieldManagedObjectTracker( + f.scheme, + serializer.NewCodecFactory(f.scheme).UniversalDecoder(), + multiTypeConverter{upstream: f.typeConverters}, + ) + usesFieldManagedObjectTracker = true + } + tracker := versionedTracker{ + ObjectTracker: f.objectTracker, + scheme: f.scheme, + withStatusSubresource: withStatusSubResource, + usesFieldManagedObjectTracker: usesFieldManagedObjectTracker, } for _, obj := range f.initObject { @@ -276,12 +341,14 @@ func (f *ClientBuilder) Build() client.WithWatch { restMapper: f.restMapper, indexes: f.indexes, withStatusSubresource: withStatusSubResource, + returnManagedFields: f.returnManagedFields, } if f.interceptorFuncs != nil { result = interceptor.NewClient(result, *f.interceptorFuncs) } + f.isBuilt = true return result } @@ -318,6 +385,16 @@ func (t versionedTracker) Add(obj runtime.Object) error { if err != nil { return err } + + // If the fieldManager can not decode fields, it will just silently clear them. This is pretty + // much guaranteed not to be what someone that initializes a fake client with objects that + // have them set wants, so validate them here. + // Ref https://github.com/kubernetes/kubernetes/blob/a956ef4862993b825bcd524a19260192ff1da72d/staging/src/k8s.io/apimachinery/pkg/util/managedfields/internal/fieldmanager.go#L105 + if t.usesFieldManagedObjectTracker { + if err := managedfields.ValidateManagedFields(accessor.GetManagedFields()); err != nil { + return fmt.Errorf("invalid managedFields on %T: %w", obj, err) + } + } if err := t.ObjectTracker.Add(obj); err != nil { return err } @@ -332,8 +409,9 @@ func (t versionedTracker) Create(gvr schema.GroupVersionResource, obj runtime.Ob return fmt.Errorf("failed to get accessor for object: %w", err) } if accessor.GetName() == "" { + gvk, _ := apiutil.GVKForObject(obj, t.scheme) return apierrors.NewInvalid( - obj.GetObjectKind().GroupVersionKind().GroupKind(), + gvk.GroupKind(), accessor.GetName(), field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) } @@ -372,6 +450,9 @@ func convertFromUnstructuredIfNecessary(s *runtime.Scheme, o runtime.Object) (ru if err != nil { return nil, fmt.Errorf("scheme recognizes %s but failed to produce an object for it: %w", gvk, err) } + if _, isTypedUnstructured := typed.(runtime.Unstructured); isTypedUnstructured { + return o, nil + } unstructuredSerialized, err := json.Marshal(u) if err != nil { @@ -394,7 +475,11 @@ func (t versionedTracker) Update(gvr schema.GroupVersionResource, obj runtime.Ob } func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, isStatus, deleting bool, opts metav1.UpdateOptions) error { - obj, err := t.updateObject(gvr, obj, ns, isStatus, deleting, opts.DryRun) + gvk, err := apiutil.GVKForObject(obj, t.scheme) + if err != nil { + return err + } + obj, err = t.updateObject(gvr, obj, ns, isStatus, deleting, opts.DryRun) if err != nil { return err } @@ -402,6 +487,10 @@ func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Ob return nil } + if u, unstructured := obj.(*unstructured.Unstructured); unstructured { + u.SetGroupVersionKind(gvk) + } + return t.ObjectTracker.Update(gvr, obj, ns, opts) } @@ -433,8 +522,9 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt } if accessor.GetName() == "" { + gvk, _ := apiutil.GVKForObject(obj, t.scheme) return nil, apierrors.NewInvalid( - obj.GetObjectKind().GroupVersionKind().GroupKind(), + gvk.GroupKind(), accessor.GetName(), field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) } @@ -521,42 +611,60 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt } func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err } + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } o, err := c.tracker.Get(gvr, key.Namespace, key.Name) if err != nil { return err } - _, isUnstructured := obj.(runtime.Unstructured) - _, isPartialObject := obj.(*metav1.PartialObjectMetadata) - - if isUnstructured || isPartialObject { - gvk, err := apiutil.GVKForObject(obj, c.scheme) - if err != nil { - return err - } - ta, err := meta.TypeAccessor(o) - if err != nil { - return err - } - ta.SetKind(gvk.Kind) - ta.SetAPIVersion(gvk.GroupVersion().String()) + ta, err := meta.TypeAccessor(o) + if err != nil { + return err } + // If the final object is unstructuctured, the json + // representation must contain GVK or the apimachinery + // json serializer will error out. + ta.SetAPIVersion(gvk.GroupVersion().String()) + ta.SetKind(gvk.Kind) + j, err := json.Marshal(o) if err != nil { return err } zero(obj) - return json.Unmarshal(j, obj) + if err := json.Unmarshal(j, obj); err != nil { + return err + } + + if !c.returnManagedFields { + obj.SetManagedFields(nil) + } + + return ensureTypeMeta(obj, gvk) } func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(list); err != nil { + return nil, err + } + + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() + gvk, err := apiutil.GVKForObject(list, c.scheme) if err != nil { return nil, err @@ -572,6 +680,10 @@ func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ... } func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() gvk, err := apiutil.GVKForObject(obj, c.scheme) @@ -579,11 +691,12 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl return err } - originalKind := gvk.Kind - + originalGVK := gvk gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") + listGVK := gvk + listGVK.Kind += "List" - if _, isUnstructuredList := obj.(runtime.Unstructured); isUnstructuredList && !c.scheme.Recognizes(gvk) { + if _, isUnstructuredList := obj.(runtime.Unstructured); isUnstructuredList && !c.scheme.Recognizes(listGVK) { // We need to register the ListKind with UnstructuredList: // https://github.com/kubernetes/kubernetes/blob/7b2776b89fb1be28d4e9203bdeec079be903c103/staging/src/k8s.io/client-go/dynamic/fake/simple.go#L44-L51 c.schemeLock.RUnlock() @@ -602,39 +715,34 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl return err } - if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { - ta, err := meta.TypeAccessor(o) - if err != nil { - return err - } - ta.SetKind(originalKind) - ta.SetAPIVersion(gvk.GroupVersion().String()) - } - j, err := json.Marshal(o) if err != nil { return err } zero(obj) + if err := ensureTypeMeta(obj, originalGVK); err != nil { + return err + } objCopy := obj.DeepCopyObject().(client.ObjectList) if err := json.Unmarshal(j, objCopy); err != nil { return err } - if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { - ta, err := meta.TypeAccessor(obj) - if err != nil { - return err - } - ta.SetKind(originalKind) - ta.SetAPIVersion(gvk.GroupVersion().String()) - } - objs, err := meta.ExtractList(objCopy) if err != nil { return err } + for _, o := range objs { + if err := ensureTypeMeta(o, gvk); err != nil { + return err + } + + if !c.returnManagedFields { + o.(metav1.Object).SetManagedFields(nil) + } + } + if listOpts.LabelSelector == nil && listOpts.FieldSelector == nil { return meta.SetList(obj, objs) } @@ -741,8 +849,13 @@ func (c *fakeClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { } func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() + createOptions := &client.CreateOptions{} createOptions.ApplyOptions(opts) @@ -773,14 +886,35 @@ func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...clie accessor.SetDeletionTimestamp(nil) } + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } + c.trackerWriteLock.Lock() defer c.trackerWriteLock.Unlock() - return c.tracker.Create(gvr, obj, accessor.GetNamespace()) + + if err := c.tracker.Create(gvr, obj, accessor.GetNamespace(), *createOptions.AsCreateOptions()); err != nil { + // The managed fields tracker sets gvk even on errors + _ = ensureTypeMeta(obj, gvk) + return err + } + + if !c.returnManagedFields { + obj.SetManagedFields(nil) + } + + return ensureTypeMeta(obj, gvk) } func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() + gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err @@ -826,8 +960,13 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() + gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -877,8 +1016,13 @@ func (c *fakeClient) Update(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.UpdateOption) error { + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + c.schemeLock.RLock() defer c.schemeLock.RUnlock() + updateOptions := &client.UpdateOptions{} updateOptions.ApplyOptions(opts) @@ -892,6 +1036,10 @@ func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.Upd if err != nil { return err } + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } accessor, err := meta.Accessor(obj) if err != nil { return err @@ -899,19 +1047,100 @@ func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.Upd c.trackerWriteLock.Lock() defer c.trackerWriteLock.Unlock() - return c.tracker.update(gvr, obj, accessor.GetNamespace(), isStatus, false, *updateOptions.AsUpdateOptions()) + + // Retain managed fields + // We can ignore all errors here since update will fail if we encounter an error. + obj.SetManagedFields(nil) + current, _ := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) + if currentMetaObj, ok := current.(metav1.Object); ok { + obj.SetManagedFields(currentMetaObj.GetManagedFields()) + } + + if err := c.tracker.update(gvr, obj, accessor.GetNamespace(), isStatus, false, *updateOptions.AsUpdateOptions()); err != nil { + return err + } + + if !c.returnManagedFields { + obj.SetManagedFields(nil) + } + + return ensureTypeMeta(obj, gvk) } func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { return c.patch(obj, patch, opts...) } +func (c *fakeClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + applyOpts := &client.ApplyOptions{} + applyOpts.ApplyOptions(opts) + + data, err := json.Marshal(obj) + if err != nil { + return fmt.Errorf("failed to marshal apply configuration: %w", err) + } + + u := &unstructured.Unstructured{} + if err := json.Unmarshal(data, u); err != nil { + return fmt.Errorf("failed to unmarshal apply configuration: %w", err) + } + + applyPatch := &fakeApplyPatch{} + + patchOpts := &client.PatchOptions{} + patchOpts.Raw = applyOpts.AsPatchOptions() + + if err := c.patch(u, applyPatch, patchOpts); err != nil { + return err + } + + acJSON, err := json.Marshal(u) + if err != nil { + return fmt.Errorf("failed to marshal patched object: %w", err) + } + + // We have to zero the object in case it contained a status and there is a + // status subresource. If its the private `unstructuredApplyConfiguration` + // we can not zero all of it, as that will cause the embedded Unstructured + // to be nil which then causes a NPD in the json.Unmarshal below. + switch reflect.TypeOf(obj).String() { + case "*client.unstructuredApplyConfiguration": + zero(reflect.ValueOf(obj).Elem().FieldByName("Unstructured").Interface()) + default: + zero(obj) + } + if err := json.Unmarshal(acJSON, obj); err != nil { + return fmt.Errorf("failed to unmarshal patched object: %w", err) + } + + return nil +} + +type fakeApplyPatch struct{} + +func (p *fakeApplyPatch) Type() types.PatchType { + return types.ApplyPatchType +} + +func (p *fakeApplyPatch) Data(obj client.Object) ([]byte, error) { + return json.Marshal(obj) +} + func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - c.schemeLock.RLock() - defer c.schemeLock.RUnlock() + if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { + return err + } + patchOptions := &client.PatchOptions{} patchOptions.ApplyOptions(opts) + if errs := validation.ValidatePatchOptions(patchOptions.AsPatchOptions(), patch.Type()); len(errs) > 0 { + return apierrors.NewInvalid(schema.GroupKind{Group: "meta.k8s.io", Kind: "PatchOptions"}, "", errs) + } + + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() + for _, dryRunOpt := range patchOptions.DryRun { if dryRunOpt == metav1.DryRunAll { return nil @@ -922,51 +1151,77 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client if err != nil { return err } - accessor, err := meta.Accessor(obj) - if err != nil { - return err - } - data, err := patch.Data(obj) + gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err } - - gvk, err := apiutil.GVKForObject(obj, c.scheme) + accessor, err := meta.Accessor(obj) if err != nil { return err } + var isApplyCreate bool c.trackerWriteLock.Lock() defer c.trackerWriteLock.Unlock() oldObj, err := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) if err != nil { - return err + if !apierrors.IsNotFound(err) || patch.Type() != types.ApplyPatchType { + return err + } + oldObj = &unstructured.Unstructured{} + isApplyCreate = true } oldAccessor, err := meta.Accessor(oldObj) if err != nil { return err } - // Apply patch without updating object. - // To remain in accordance with the behavior of k8s api behavior, - // a patch must not allow for changes to the deletionTimestamp of an object. - // The reaction() function applies the patch to the object and calls Update(), - // whereas dryPatch() replicates this behavior but skips the call to Update(). - // This ensures that the patch may be rejected if a deletionTimestamp is modified, prior - // to updating the object. - action := testing.NewPatchAction(gvr, accessor.GetNamespace(), accessor.GetName(), patch.Type(), data) - o, err := dryPatch(action, c.tracker) - if err != nil { - return err + // SSA deletionTimestamp updates are silently ignored + if patch.Type() == types.ApplyPatchType && !isApplyCreate { + obj.SetDeletionTimestamp(oldAccessor.GetDeletionTimestamp()) } - newObj, err := meta.Accessor(o) + + data, err := patch.Data(obj) if err != nil { return err } - // Validate that deletionTimestamp has not been changed - if !deletionTimestampEqual(newObj, oldAccessor) { - return fmt.Errorf("rejected patch, metadata.deletionTimestamp immutable") + action := testing.NewPatchActionWithOptions( + gvr, + accessor.GetNamespace(), + accessor.GetName(), + patch.Type(), + data, + *patchOptions.AsPatchOptions(), + ) + + // Apply is implemented in the tracker and calling it has side-effects + // such as bumping RV and updating managedFields timestamps, hence we + // can not dry-run it. Luckily, the only validation we use it for + // doesn't apply to SSA - Creating objects with non-nil deletionTimestamp + // through SSA is possible and updating the deletionTimestamp is valid, + // but has no effect. + if patch.Type() != types.ApplyPatchType { + // Apply patch without updating object. + // To remain in accordance with the behavior of k8s api behavior, + // a patch must not allow for changes to the deletionTimestamp of an object. + // The reaction() function applies the patch to the object and calls Update(), + // whereas dryPatch() replicates this behavior but skips the call to Update(). + // This ensures that the patch may be rejected if a deletionTimestamp is modified, prior + // to updating the object. + o, err := dryPatch(action, c.tracker) + if err != nil { + return err + } + newObj, err := meta.Accessor(o) + if err != nil { + return err + } + + // Validate that deletionTimestamp has not been changed + if !deletionTimestampEqual(newObj, oldAccessor) { + return fmt.Errorf("rejected patch, metadata.deletionTimestamp immutable") + } } reaction := testing.ObjectReaction(c.tracker) @@ -978,21 +1233,28 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client panic("tracker could not handle patch method") } - if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { - ta, err := meta.TypeAccessor(o) - if err != nil { - return err - } - ta.SetKind(gvk.Kind) - ta.SetAPIVersion(gvk.GroupVersion().String()) + ta, err := meta.TypeAccessor(o) + if err != nil { + return err } + ta.SetAPIVersion(gvk.GroupVersion().String()) + ta.SetKind(gvk.Kind) + j, err := json.Marshal(o) if err != nil { return err } zero(obj) - return json.Unmarshal(j, obj) + if err := json.Unmarshal(j, obj); err != nil { + return err + } + + if !c.returnManagedFields { + obj.SetManagedFields(nil) + } + + return ensureTypeMeta(obj, gvk) } // Applying a patch results in a deletionTimestamp that is truncated to the nearest second. @@ -1020,6 +1282,9 @@ func dryPatch(action testing.PatchActionImpl, tracker testing.ObjectTracker) (ru obj, err := tracker.Get(gvr, ns, action.GetName()) if err != nil { + if apierrors.IsNotFound(err) && action.GetPatchType() == types.ApplyPatchType { + return &unstructured.Unstructured{}, nil + } return nil, err } @@ -1064,10 +1329,10 @@ func dryPatch(action testing.PatchActionImpl, tracker testing.ObjectTracker) (ru if err = json.Unmarshal(mergedByte, obj); err != nil { return nil, err } - case types.ApplyPatchType: - return nil, errors.New("apply patches are not supported in the fake client. Follow https://github.com/kubernetes/kubernetes/issues/115598 for the current status") case types.ApplyCBORPatchType: return nil, errors.New("apply CBOR patches are not supported in the fake client") + case types.ApplyPatchType: + return nil, errors.New("bug in controller-runtime: should not end up in dryPatch for SSA") default: return nil, fmt.Errorf("%s PatchType is not supported", action.GetPatchType()) } @@ -1600,3 +1865,47 @@ func AddIndex(c client.Client, obj runtime.Object, field string, extractValue cl return nil } + +func (c *fakeClient) addToSchemeIfUnknownAndUnstructuredOrPartial(obj runtime.Object) error { + c.schemeLock.Lock() + defer c.schemeLock.Unlock() + + _, isUnstructured := obj.(*unstructured.Unstructured) + _, isUnstructuredList := obj.(*unstructured.UnstructuredList) + _, isPartial := obj.(*metav1.PartialObjectMetadata) + _, isPartialList := obj.(*metav1.PartialObjectMetadataList) + if !isUnstructured && !isUnstructuredList && !isPartial && !isPartialList { + return nil + } + + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } + + if !c.scheme.Recognizes(gvk) { + c.scheme.AddKnownTypeWithName(gvk, obj) + } + + return nil +} + +func ensureTypeMeta(obj runtime.Object, gvk schema.GroupVersionKind) error { + ta, err := meta.TypeAccessor(obj) + if err != nil { + return err + } + _, isUnstructured := obj.(runtime.Unstructured) + _, isPartialObject := obj.(*metav1.PartialObjectMetadata) + _, isPartialObjectList := obj.(*metav1.PartialObjectMetadataList) + if !isUnstructured && !isPartialObject && !isPartialObjectList { + ta.SetKind("") + ta.SetAPIVersion("") + return nil + } + + ta.SetKind(gvk.Kind) + ta.SetAPIVersion(gvk.GroupVersion().String()) + + return nil +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/typeconverter.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/typeconverter.go new file mode 100644 index 000000000000..3cb3a0dc77cf --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/typeconverter.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/managedfields" + "sigs.k8s.io/structured-merge-diff/v6/typed" +) + +type multiTypeConverter struct { + upstream []managedfields.TypeConverter +} + +func (m multiTypeConverter) ObjectToTyped(r runtime.Object, o ...typed.ValidationOptions) (*typed.TypedValue, error) { + var errs []error + for _, u := range m.upstream { + res, err := u.ObjectToTyped(r, o...) + if err != nil { + errs = append(errs, err) + continue + } + + return res, nil + } + + return nil, fmt.Errorf("failed to convert Object to TypedValue: %w", kerrors.NewAggregate(errs)) +} + +func (m multiTypeConverter) TypedToObject(v *typed.TypedValue) (runtime.Object, error) { + var errs []error + for _, u := range m.upstream { + res, err := u.TypedToObject(v) + if err != nil { + errs = append(errs, err) + continue + } + + return res, nil + } + + return nil, fmt.Errorf("failed to convert TypedValue to Object: %w", kerrors.NewAggregate(errs)) +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldowner.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldowner.go index 07183cd192ef..93274f950071 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldowner.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldowner.go @@ -54,6 +54,10 @@ func (f *clientWithFieldManager) Patch(ctx context.Context, obj Object, patch Pa return f.c.Patch(ctx, obj, patch, append([]PatchOption{FieldOwner(f.owner)}, opts...)...) } +func (f *clientWithFieldManager) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + return f.c.Apply(ctx, obj, append([]ApplyOption{FieldOwner(f.owner)}, opts...)...) +} + func (f *clientWithFieldManager) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { return f.c.Delete(ctx, obj, opts...) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldvalidation.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldvalidation.go index 659b3d44c916..ce8d0576c7c4 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldvalidation.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fieldvalidation.go @@ -53,6 +53,10 @@ func (c *clientWithFieldValidation) Patch(ctx context.Context, obj Object, patch return c.client.Patch(ctx, obj, patch, append([]PatchOption{c.validation}, opts...)...) } +func (c *clientWithFieldValidation) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + return c.client.Apply(ctx, obj, opts...) +} + func (c *clientWithFieldValidation) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { return c.client.Delete(ctx, obj, opts...) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interceptor/intercept.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interceptor/intercept.go index 3d3f3cb0118d..7ff73bd8daa5 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interceptor/intercept.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interceptor/intercept.go @@ -19,6 +19,7 @@ type Funcs struct { DeleteAllOf func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.DeleteAllOfOption) error Update func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error Patch func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error + Apply func(ctx context.Context, client client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error Watch func(ctx context.Context, client client.WithWatch, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) SubResource func(client client.WithWatch, subResource string) client.SubResourceClient SubResourceGet func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceGetOption) error @@ -92,6 +93,14 @@ func (c interceptor) Patch(ctx context.Context, obj client.Object, patch client. return c.client.Patch(ctx, obj, patch, opts...) } +func (c interceptor) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + if c.funcs.Apply != nil { + return c.funcs.Apply(ctx, c.client, obj, opts...) + } + + return c.client.Apply(ctx, obj, opts...) +} + func (c interceptor) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { if c.funcs.DeleteAllOf != nil { return c.funcs.DeleteAllOf(ctx, c.client, obj, opts...) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go index 3b282fc2c56f..61559ecbe10b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go @@ -61,6 +61,9 @@ type Reader interface { // Writer knows how to create, delete, and update Kubernetes objects. type Writer interface { + // Apply applies the given apply configuration to the Kubernetes cluster. + Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error + // Create saves the object obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. Create(ctx context.Context, obj Object, opts ...CreateOption) error diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/namespaced_client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/namespaced_client.go index 222dc795793c..d4223eda26c8 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/namespaced_client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/namespaced_client.go @@ -19,10 +19,13 @@ package client import ( "context" "fmt" + "reflect" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // NewNamespacedClient wraps an existing client enforcing the namespace value. @@ -147,6 +150,52 @@ func (n *namespacedClient) Patch(ctx context.Context, obj Object, patch Patch, o return n.client.Patch(ctx, obj, patch, opts...) } +func (n *namespacedClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + var gvk schema.GroupVersionKind + switch o := obj.(type) { + case applyConfiguration: + var err error + gvk, err = gvkFromApplyConfiguration(o) + if err != nil { + return err + } + case *unstructuredApplyConfiguration: + gvk = o.GroupVersionKind() + default: + return fmt.Errorf("object %T is not a valid apply configuration", obj) + } + isNamespaceScoped, err := apiutil.IsGVKNamespaced(gvk, n.RESTMapper()) + if err != nil { + return fmt.Errorf("error finding the scope of the object: %w", err) + } + if isNamespaceScoped { + switch o := obj.(type) { + case applyConfiguration: + if o.GetNamespace() != nil && *o.GetNamespace() != "" && *o.GetNamespace() != n.namespace { + return fmt.Errorf("namespace %s provided for the object %s does not match the namespace %s on the client", + *o.GetNamespace(), ptr.Deref(o.GetName(), ""), n.namespace) + } + v := reflect.ValueOf(o) + withNamespace := v.MethodByName("WithNamespace") + if !withNamespace.IsValid() { + return fmt.Errorf("ApplyConfiguration %T does not have a WithNamespace method", o) + } + if tp := withNamespace.Type(); tp.NumIn() != 1 || tp.In(0).Kind() != reflect.String { + return fmt.Errorf("WithNamespace method of ApplyConfiguration %T must take a single string argument", o) + } + withNamespace.Call([]reflect.Value{reflect.ValueOf(n.namespace)}) + case *unstructuredApplyConfiguration: + if o.GetNamespace() != "" && o.GetNamespace() != n.namespace { + return fmt.Errorf("namespace %s provided for the object %s does not match the namespace %s on the client", + o.GetNamespace(), o.GetName(), n.namespace) + } + o.SetNamespace(n.namespace) + } + } + + return n.client.Apply(ctx, obj, opts...) +} + // Get implements client.Client. func (n *namespacedClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { isNamespaceScoped, err := n.IsObjectNamespaced(obj) @@ -164,7 +213,12 @@ func (n *namespacedClient) Get(ctx context.Context, key ObjectKey, obj Object, o // List implements client.Client. func (n *namespacedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error { - if n.namespace != "" { + isNamespaceScoped, err := n.IsObjectNamespaced(obj) + if err != nil { + return fmt.Errorf("error finding the scope of the object: %w", err) + } + + if isNamespaceScoped && n.namespace != "" { opts = append(opts, InNamespace(n.namespace)) } return n.client.List(ctx, obj, opts...) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go index db50ed8febe5..33c460738cfe 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/utils/ptr" ) // {{{ "Functional" Option Interfaces @@ -61,6 +62,12 @@ type PatchOption interface { ApplyToPatch(*PatchOptions) } +// ApplyOption is some configuration that modifies options for an apply request. +type ApplyOption interface { + // ApplyToApply applies this configuration to the given apply options. + ApplyToApply(*ApplyOptions) +} + // DeleteAllOfOption is some configuration that modifies options for a delete request. type DeleteAllOfOption interface { // ApplyToDeleteAllOf applies this configuration to the given deletecollection options. @@ -115,7 +122,12 @@ func (dryRunAll) ApplyToPatch(opts *PatchOptions) { opts.DryRun = []string{metav1.DryRunAll} } -// ApplyToPatch applies this configuration to the given delete options. +// ApplyToApply applies this configuration to the given apply options. +func (dryRunAll) ApplyToApply(opts *ApplyOptions) { + opts.DryRun = []string{metav1.DryRunAll} +} + +// ApplyToDelete applies this configuration to the given delete options. func (dryRunAll) ApplyToDelete(opts *DeleteOptions) { opts.DryRun = []string{metav1.DryRunAll} } @@ -154,6 +166,11 @@ func (f FieldOwner) ApplyToUpdate(opts *UpdateOptions) { opts.FieldManager = string(f) } +// ApplyToApply applies this configuration to the given apply options. +func (f FieldOwner) ApplyToApply(opts *ApplyOptions) { + opts.FieldManager = string(f) +} + // ApplyToSubResourcePatch applies this configuration to the given patch options. func (f FieldOwner) ApplyToSubResourcePatch(opts *SubResourcePatchOptions) { opts.FieldManager = string(f) @@ -431,6 +448,12 @@ type GetOptions struct { // Raw represents raw GetOptions, as passed to the API server. Note // that these may not be respected by all implementations of interface. Raw *metav1.GetOptions + + // UnsafeDisableDeepCopy indicates not to deep copy objects during get object. + // Be very careful with this, when enabled you must DeepCopy any object before mutating it, + // otherwise you will mutate the object in the cache. + // +optional + UnsafeDisableDeepCopy *bool } var _ GetOption = &GetOptions{} @@ -440,6 +463,9 @@ func (o *GetOptions) ApplyToGet(lo *GetOptions) { if o.Raw != nil { lo.Raw = o.Raw } + if o.UnsafeDisableDeepCopy != nil { + lo.UnsafeDisableDeepCopy = o.UnsafeDisableDeepCopy + } } // AsGetOptions returns these options as a flattened metav1.GetOptions. @@ -618,6 +644,9 @@ type MatchingLabelsSelector struct { // ApplyToList applies this configuration to the given list options. func (m MatchingLabelsSelector) ApplyToList(opts *ListOptions) { + if m.Selector == nil { + m.Selector = labels.Nothing() + } opts.LabelSelector = m } @@ -651,6 +680,9 @@ type MatchingFieldsSelector struct { // ApplyToList applies this configuration to the given list options. func (m MatchingFieldsSelector) ApplyToList(opts *ListOptions) { + if m.Selector == nil { + m.Selector = fields.Nothing() + } opts.FieldSelector = m } @@ -692,15 +724,14 @@ func (l Limit) ApplyToList(opts *ListOptions) { // otherwise you will mutate the object in the cache. type UnsafeDisableDeepCopyOption bool +// ApplyToGet applies this configuration to the given an Get options. +func (d UnsafeDisableDeepCopyOption) ApplyToGet(opts *GetOptions) { + opts.UnsafeDisableDeepCopy = ptr.To(bool(d)) +} + // ApplyToList applies this configuration to the given an List options. func (d UnsafeDisableDeepCopyOption) ApplyToList(opts *ListOptions) { - definitelyTrue := true - definitelyFalse := false - if d { - opts.UnsafeDisableDeepCopy = &definitelyTrue - } else { - opts.UnsafeDisableDeepCopy = &definitelyFalse - } + opts.UnsafeDisableDeepCopy = ptr.To(bool(d)) } // UnsafeDisableDeepCopy indicates not to deep copy objects during list objects. @@ -863,10 +894,18 @@ func (o *PatchOptions) AsPatchOptions() *metav1.PatchOptions { o.Raw = &metav1.PatchOptions{} } - o.Raw.DryRun = o.DryRun - o.Raw.Force = o.Force - o.Raw.FieldManager = o.FieldManager - o.Raw.FieldValidation = o.FieldValidation + if o.DryRun != nil { + o.Raw.DryRun = o.DryRun + } + if o.Force != nil { + o.Raw.Force = o.Force + } + if o.FieldManager != "" { + o.Raw.FieldManager = o.FieldManager + } + if o.FieldValidation != "" { + o.Raw.FieldValidation = o.FieldValidation + } return o.Raw } @@ -899,13 +938,15 @@ var ForceOwnership = forceOwnership{} type forceOwnership struct{} func (forceOwnership) ApplyToPatch(opts *PatchOptions) { - definitelyTrue := true - opts.Force = &definitelyTrue + opts.Force = ptr.To(true) } func (forceOwnership) ApplyToSubResourcePatch(opts *SubResourcePatchOptions) { - definitelyTrue := true - opts.Force = &definitelyTrue + opts.Force = ptr.To(true) +} + +func (forceOwnership) ApplyToApply(opts *ApplyOptions) { + opts.Force = ptr.To(true) } // }}} @@ -939,3 +980,57 @@ func (o *DeleteAllOfOptions) ApplyToDeleteAllOf(do *DeleteAllOfOptions) { } // }}} + +// ApplyOptions are the options for an apply request. +type ApplyOptions struct { + // When present, indicates that modifications should not be + // persisted. An invalid or unrecognized dryRun directive will + // result in an error response and no further processing of the + // request. Valid values are: + // - All: all dry run stages will be processed + DryRun []string + + // Force is going to "force" Apply requests. It means user will + // re-acquire conflicting fields owned by other people. + Force *bool + + // fieldManager is a name associated with the actor or entity + // that is making these changes. The value must be less than or + // 128 characters long, and only contain printable characters, + // as defined by https://golang.org/pkg/unicode/#IsPrint. This + // field is required. + // + // +required + FieldManager string +} + +// ApplyOptions applies the given opts onto the ApplyOptions +func (o *ApplyOptions) ApplyOptions(opts []ApplyOption) *ApplyOptions { + for _, opt := range opts { + opt.ApplyToApply(o) + } + return o +} + +// ApplyToApply applies the given opts onto the ApplyOptions +func (o *ApplyOptions) ApplyToApply(opts *ApplyOptions) { + if o.DryRun != nil { + opts.DryRun = o.DryRun + } + if o.Force != nil { + opts.Force = o.Force + } + + if o.FieldManager != "" { + opts.FieldManager = o.FieldManager + } +} + +// AsPatchOptions constructs patch options from the given ApplyOptions +func (o *ApplyOptions) AsPatchOptions() *metav1.PatchOptions { + return &metav1.PatchOptions{ + DryRun: o.DryRun, + Force: o.Force, + FieldManager: o.FieldManager, + } +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go index 11d608388562..b99d7663bdff 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/patch.go @@ -27,6 +27,11 @@ import ( var ( // Apply uses server-side apply to patch the given object. + // + // This should now only be used to patch sub resources, e.g. with client.Client.Status().Patch(). + // Use client.Client.Apply() instead of client.Client.Patch(..., client.Apply, ...) + // This will be deprecated once the Apply method has been added for sub resources. + // See the following issue for more details: https://github.com/kubernetes-sigs/controller-runtime/issues/3183 Apply Patch = applyPatch{} // Merge uses the raw object as a merge patch, without modifications. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/typed_client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/typed_client.go index 92afd9a9c258..3bd762a6382c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/typed_client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/typed_client.go @@ -18,8 +18,10 @@ package client import ( "context" + "fmt" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/apply" ) var _ Reader = &typedClient{} @@ -41,7 +43,7 @@ func (c *typedClient) Create(ctx context.Context, obj Object, opts ...CreateOpti createOpts.ApplyOptions(opts) return o.Post(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Body(obj). VersionedParams(createOpts.AsCreateOptions(), c.paramCodec). @@ -60,9 +62,9 @@ func (c *typedClient) Update(ctx context.Context, obj Object, opts ...UpdateOpti updateOpts.ApplyOptions(opts) return o.Put(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). Body(obj). VersionedParams(updateOpts.AsUpdateOptions(), c.paramCodec). Do(ctx). @@ -80,9 +82,9 @@ func (c *typedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOpti deleteOpts.ApplyOptions(opts) return o.Delete(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). Body(deleteOpts.AsDeleteOptions()). Do(ctx). Error() @@ -123,15 +125,40 @@ func (c *typedClient) Patch(ctx context.Context, obj Object, patch Patch, opts . patchOpts.ApplyOptions(opts) return o.Patch(patch.Type()). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). VersionedParams(patchOpts.AsPatchOptions(), c.paramCodec). Body(data). Do(ctx). Into(obj) } +func (c *typedClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + o, err := c.resources.getObjMeta(obj) + if err != nil { + return err + } + req, err := apply.NewRequest(o, obj) + if err != nil { + return fmt.Errorf("failed to create apply request: %w", err) + } + applyOpts := &ApplyOptions{} + applyOpts.ApplyOptions(opts) + + return req. + NamespaceIfScoped(o.namespace, o.isNamespaced()). + Resource(o.resource()). + Name(o.name). + VersionedParams(applyOpts.AsPatchOptions(), c.paramCodec). + Do(ctx). + // This is hacky, it is required because `Into` takes a `runtime.Object` and + // that is not implemented by the ApplyConfigurations. The generated clients + // don't have this problem because they deserialize into the api type, not the + // apply configuration: https://github.com/kubernetes/kubernetes/blob/22f5e01a37c0bc6a5f494dec14dd4e3688ee1d55/staging/src/k8s.io/client-go/gentype/type.go#L296-L317 + Into(runtimeObjectFromApplyConfiguration(obj)) +} + // Get implements client.Client. func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { r, err := c.resources.getResource(obj) @@ -179,9 +206,9 @@ func (c *typedClient) GetSubResource(ctx context.Context, obj, subResourceObj Ob getOpts.ApplyOptions(opts) return o.Get(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). VersionedParams(getOpts.AsGetOptions(), c.paramCodec). Do(ctx). @@ -202,9 +229,9 @@ func (c *typedClient) CreateSubResource(ctx context.Context, obj Object, subReso createOpts.ApplyOptions(opts) return o.Post(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(subResourceObj). VersionedParams(createOpts.AsCreateOptions(), c.paramCodec). @@ -237,9 +264,9 @@ func (c *typedClient) UpdateSubResource(ctx context.Context, obj Object, subReso } return o.Put(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(body). VersionedParams(updateOpts.AsUpdateOptions(), c.paramCodec). @@ -268,9 +295,9 @@ func (c *typedClient) PatchSubResource(ctx context.Context, obj Object, subResou } return o.Patch(patch.Type()). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(data). VersionedParams(patchOpts.AsPatchOptions(), c.paramCodec). diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/unstructured_client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/unstructured_client.go index 0d96951780ee..e636c3beef25 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/unstructured_client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/unstructured_client.go @@ -22,6 +22,7 @@ import ( "strings" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/apply" ) var _ Reader = &unstructuredClient{} @@ -50,7 +51,7 @@ func (uc *unstructuredClient) Create(ctx context.Context, obj Object, opts ...Cr createOpts.ApplyOptions(opts) result := o.Post(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). Body(obj). VersionedParams(createOpts.AsCreateOptions(), uc.paramCodec). @@ -79,9 +80,9 @@ func (uc *unstructuredClient) Update(ctx context.Context, obj Object, opts ...Up updateOpts.ApplyOptions(opts) result := o.Put(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). Body(obj). VersionedParams(updateOpts.AsUpdateOptions(), uc.paramCodec). Do(ctx). @@ -106,9 +107,9 @@ func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...De deleteOpts.ApplyOptions(opts) return o.Delete(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). Body(deleteOpts.AsDeleteOptions()). Do(ctx). Error() @@ -157,15 +158,41 @@ func (uc *unstructuredClient) Patch(ctx context.Context, obj Object, patch Patch patchOpts.ApplyOptions(opts) return o.Patch(patch.Type()). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). VersionedParams(patchOpts.AsPatchOptions(), uc.paramCodec). Body(data). Do(ctx). Into(obj) } +func (uc *unstructuredClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + unstructuredApplyConfig, ok := obj.(*unstructuredApplyConfiguration) + if !ok { + return fmt.Errorf("bug: unstructured client got an applyconfiguration that was not %T but %T", &unstructuredApplyConfiguration{}, obj) + } + o, err := uc.resources.getObjMeta(unstructuredApplyConfig.Unstructured) + if err != nil { + return err + } + + req, err := apply.NewRequest(o, obj) + if err != nil { + return fmt.Errorf("failed to create apply request: %w", err) + } + applyOpts := &ApplyOptions{} + applyOpts.ApplyOptions(opts) + + return req. + NamespaceIfScoped(o.namespace, o.isNamespaced()). + Resource(o.resource()). + Name(o.name). + VersionedParams(applyOpts.AsPatchOptions(), uc.paramCodec). + Do(ctx). + Into(unstructuredApplyConfig.Unstructured) +} + // Get implements client.Client. func (uc *unstructuredClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { u, ok := obj.(runtime.Unstructured) @@ -244,9 +271,9 @@ func (uc *unstructuredClient) GetSubResource(ctx context.Context, obj, subResour getOpts.ApplyOptions(opts) return o.Get(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). VersionedParams(getOpts.AsGetOptions(), uc.paramCodec). Do(ctx). @@ -275,9 +302,9 @@ func (uc *unstructuredClient) CreateSubResource(ctx context.Context, obj, subRes createOpts.ApplyOptions(opts) return o.Post(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(subResourceObj). VersionedParams(createOpts.AsCreateOptions(), uc.paramCodec). @@ -310,9 +337,9 @@ func (uc *unstructuredClient) UpdateSubResource(ctx context.Context, obj Object, } return o.Put(). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(body). VersionedParams(updateOpts.AsUpdateOptions(), uc.paramCodec). @@ -347,9 +374,9 @@ func (uc *unstructuredClient) PatchSubResource(ctx context.Context, obj Object, } result := o.Patch(patch.Type()). - NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()). + NamespaceIfScoped(o.namespace, o.isNamespaced()). Resource(o.resource()). - Name(o.GetName()). + Name(o.name). SubResource(subResource). Body(data). VersionedParams(patchOpts.AsPatchOptions(), uc.paramCodec). diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go index a5655593ef74..3dafaef93b6a 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go @@ -60,12 +60,33 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + // + // Note: This feature is currently in beta and subject to change. + // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/3220. + EnableWarmup *bool + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // - // Note: This flag is disabled by default until a future version. It's currently in beta. + // Note: This flag is disabled by default until a future version. This feature is currently in beta. + // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/2374. UsePriorityQueue *bool // Logger is the logger controllers should use. Logger logr.Logger + + // ReconciliationTimeout is used as the timeout passed to the context of each Reconcile call. + // By default, there is no timeout. + ReconciliationTimeout time.Duration } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go index 9de959b48f6c..afa15aebec1b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go @@ -91,8 +91,29 @@ type TypedOptions[request comparable] struct { // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // - // Note: This flag is disabled by default until a future version. It's currently in beta. + // Note: This flag is disabled by default until a future version. This feature is currently in beta. + // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/2374. UsePriorityQueue *bool + + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + // + // Note: This feature is currently in beta and subject to change. + // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/3220. + EnableWarmup *bool + + // ReconciliationTimeout is used as the timeout passed to the context of each Reconcile call. + // By default, there is no timeout. + ReconciliationTimeout time.Duration } // DefaultFromConfig defaults the config from a config.Controller @@ -124,6 +145,14 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller if options.NeedLeaderElection == nil { options.NeedLeaderElection = config.NeedLeaderElection } + + if options.EnableWarmup == nil { + options.EnableWarmup = config.EnableWarmup + } + + if options.ReconciliationTimeout == 0 { + options.ReconciliationTimeout = config.ReconciliationTimeout + } } // Controller implements an API. A Controller manages a work queue fed reconcile.Requests @@ -243,7 +272,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req } // Create controller with dependencies set - return &controller.Controller[request]{ + return controller.New[request](controller.Options[request]{ Do: options.Reconciler, RateLimiter: options.RateLimiter, NewQueue: options.NewQueue, @@ -253,7 +282,9 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, LeaderElected: options.NeedLeaderElection, - }, nil + EnableWarmup: options.EnableWarmup, + ReconciliationTimeout: options.ReconciliationTimeout, + }), nil } // ReconcileIDFromContext gets the reconcileID from the current context. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go index c3f77a6f39f1..71363f0d1792 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -19,7 +20,10 @@ import ( type AddOpts struct { After time.Duration RateLimited bool - Priority int + // Priority is the priority of the item. Higher values + // indicate higher priority. + // Defaults to zero if unset. + Priority *int } // PriorityQueue is a priority queue for a controller. It @@ -120,8 +124,8 @@ type priorityqueue[T comparable] struct { get chan item[T] // waiters is the number of routines blocked in Get, we use it to determine - // if we can push items. - waiters atomic.Int64 + // if we can push items. Every manipulation has to be protected with the lock. + waiters int64 // Configurable for testing now func() time.Time @@ -129,6 +133,10 @@ type priorityqueue[T comparable] struct { } func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { + if w.shutdown.Load() { + return + } + w.lock.Lock() defer w.lock.Unlock() @@ -150,7 +158,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { item := &item[T]{ Key: key, AddedCounter: w.addedCounter, - Priority: o.Priority, + Priority: ptr.Deref(o.Priority, 0), ReadyAt: readyAt, } w.items[key] = item @@ -165,12 +173,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.Priority { + if newPriority := ptr.Deref(o.Priority, 0); newPriority > item.Priority { // Update depth metric only if the item in the queue was already added to the depth metric. if item.ReadyAt == nil || w.becameReady.Has(key) { - w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) + w.metrics.updateDepthWithPriorityMetric(item.Priority, newPriority) } - item.Priority = o.Priority + item.Priority = newPriority } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { @@ -199,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -206,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -220,39 +229,67 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // We iterate through the priorities high to low until we find a ready item + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + // Adjusting the pivot item moves the ascend to the next lower priority + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters-- + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -274,12 +311,29 @@ func (w *priorityqueue[T]) AddRateLimited(item T) { } func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { - w.waiters.Add(1) + if w.shutdown.Load() { + var zero T + return zero, 0, true + } + + w.lock.Lock() + w.waiters++ + w.lock.Unlock() w.notifyItemOrWaiterAdded() - item := <-w.get - return item.Key, item.Priority, w.shutdown.Load() + select { + case <-w.done: + // Return if the queue was shutdown while we were already waiting for an item here. + // For example controller workers are continuously calling GetWithPriority and + // GetWithPriority is blocking the workers if there are no items in the queue. + // If the controller and accordingly the queue is then shut down, without this code + // branch the controller workers remain blocked here and are unable to shut down. + var zero T + return zero, 0, true + case item := <-w.get: + return item.Key, item.Priority, w.shutdown.Load() + } } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -365,6 +419,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -374,9 +431,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -404,4 +458,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go index fe78f21a2c89..62d67281512c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go @@ -20,6 +20,7 @@ import ( "context" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -141,7 +142,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue( if !ok { if lowPriority { q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{ - Priority: LowPriority, + Priority: ptr.To(LowPriority), }, req) } else { q.Add(req) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go index 29e755cbfaf2..88510d29eded 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go @@ -19,8 +19,10 @@ package handler import ( "context" "reflect" + "time" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -126,20 +128,14 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr h.CreateFunc(ctx, e, q) return } - wq := workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: q, + + wq := workqueueWithDefaultPriority[request]{ // We already know that we have a priority queue, that event.Object implements // client.Object and that its not nil - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - var priority int - if e.IsInInitialList { - priority = LowPriority - } - q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, - item, - ) - }, + PriorityQueue: q.(priorityqueue.PriorityQueue[request]), + } + if e.IsInInitialList { + wq.priority = ptr.To(LowPriority) } h.CreateFunc(ctx, e, wq) } @@ -160,20 +156,13 @@ func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUp return } - wq := workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: q, + wq := workqueueWithDefaultPriority[request]{ // We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement // client.Object and that they are not nil - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - var priority int - if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { - priority = LowPriority - } - q.(priorityqueue.PriorityQueue[request]).AddWithOpts( - priorityqueue.AddOpts{Priority: priority}, - item, - ) - }, + PriorityQueue: q.(priorityqueue.PriorityQueue[request]), + } + if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { + wq.priority = ptr.To(LowPriority) } h.UpdateFunc(ctx, e, wq) } @@ -201,13 +190,28 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty } } -type workqueueWithCustomAddFunc[request comparable] struct { - workqueue.TypedRateLimitingInterface[request] - addFunc func(item request, q workqueue.TypedRateLimitingInterface[request]) +type workqueueWithDefaultPriority[request comparable] struct { + priorityqueue.PriorityQueue[request] + priority *int +} + +func (w workqueueWithDefaultPriority[request]) Add(item request) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority}, item) } -func (w workqueueWithCustomAddFunc[request]) Add(item request) { - w.addFunc(item, w.TypedRateLimitingInterface) +func (w workqueueWithDefaultPriority[request]) AddAfter(item request, after time.Duration) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority, After: after}, item) +} + +func (w workqueueWithDefaultPriority[request]) AddRateLimited(item request) { + w.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: w.priority, RateLimited: true}, item) +} + +func (w workqueueWithDefaultPriority[request]) AddWithOpts(o priorityqueue.AddOpts, items ...request) { + if o.Priority == nil { + o.Priority = w.priority + } + w.PriorityQueue.AddWithOpts(o, items...) } // addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler @@ -219,9 +223,9 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate return } - var priority int + var priority *int if evt.IsInInitialList { - priority = LowPriority + priority = ptr.To(LowPriority) } priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) } @@ -235,9 +239,9 @@ func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRate return } - var priority int + var priority *int if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() { - priority = LowPriority + priority = ptr.To(LowPriority) } priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index 9fa7ec71e1ed..ea7968186233 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -30,6 +30,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -38,6 +39,55 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) +// Options are the arguments for creating a new Controller. +type Options[request comparable] struct { + // Reconciler is a function that can be called at any time with the Name / Namespace of an object and + // ensures that the state of the system matches the state specified in the object. + // Defaults to the DefaultReconcileFunc. + Do reconcile.TypedReconciler[request] + + // RateLimiter is used to limit how frequently requests may be queued into the work queue. + RateLimiter workqueue.TypedRateLimiter[request] + + // NewQueue constructs the queue for this controller once the controller is ready to start. + // This is a func because the standard Kubernetes work queues start themselves immediately, which + // leads to goroutine leaks if something calls controller.New repeatedly. + NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] + + // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. + MaxConcurrentReconciles int + + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration + + // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. + Name string + + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, + // or for example when a watch is started. + // Note: LogConstructor has to be able to handle nil requests as we are also using it + // outside the context of a reconciliation. + LogConstructor func(request *request) logr.Logger + + // RecoverPanic indicates whether the panic caused by reconcile should be recovered. + // Defaults to true. + RecoverPanic *bool + + // LeaderElected indicates whether the controller is leader elected or always running. + LeaderElected *bool + + // EnableWarmup specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + EnableWarmup *bool + + // ReconciliationTimeout is used as the timeout passed to the context of each Reconcile call. + // By default, there is no timeout. + ReconciliationTimeout time.Duration +} + // Controller implements controller.Controller. type Controller[request comparable] struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. @@ -83,6 +133,14 @@ type Controller[request comparable] struct { // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []source.TypedSource[request] + // startedEventSourcesAndQueue is used to track if the event sources have been started. + // It ensures that we append sources to c.startWatches only until we call Start() / Warmup() + // It is true if startEventSourcesAndQueueLocked has been called at least once. + startedEventSourcesAndQueue bool + + // didStartEventSourcesOnce is used to ensure that the event sources are only started once. + didStartEventSourcesOnce sync.Once + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it @@ -95,6 +153,38 @@ type Controller[request comparable] struct { // LeaderElected indicates whether the controller is leader elected or always running. LeaderElected *bool + + // EnableWarmup specifies whether the controller should start its sources when the manager is not + // the leader. This is useful for cases where sources take a long time to start, as it allows + // for the controller to warm up its caches even before it is elected as the leader. This + // improves leadership failover time, as the caches will be prepopulated before the controller + // transitions to be leader. + // + // Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its + // sources without waiting to become leader. + // Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without + // leader election do not wait on leader election to start their sources. + // Defaults to false. + EnableWarmup *bool + + ReconciliationTimeout time.Duration +} + +// New returns a new Controller configured with the given options. +func New[request comparable](options Options[request]) *Controller[request] { + return &Controller[request]{ + Do: options.Do, + RateLimiter: options.RateLimiter, + NewQueue: options.NewQueue, + MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, + Name: options.Name, + LogConstructor: options.LogConstructor, + RecoverPanic: options.RecoverPanic, + LeaderElected: options.LeaderElected, + EnableWarmup: options.EnableWarmup, + ReconciliationTimeout: options.ReconciliationTimeout, + } } // Reconcile implements reconcile.Reconciler. @@ -116,6 +206,13 @@ func (c *Controller[request]) Reconcile(ctx context.Context, req request) (_ rec panic(r) } }() + + if c.ReconciliationTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, c.ReconciliationTimeout) + defer cancel() + } + return c.Do.Reconcile(ctx, req) } @@ -124,10 +221,9 @@ func (c *Controller[request]) Watch(src source.TypedSource[request]) error { c.mu.Lock() defer c.mu.Unlock() - // Controller hasn't started yet, store the watches locally and return. - // - // These watches are going to be held on the controller struct until the manager or user calls Start(...). - if !c.Started { + // Sources weren't started yet, store the watches locally and return. + // These sources are going to be held until either Warmup() or Start(...) is called. + if !c.startedEventSourcesAndQueue { c.startWatches = append(c.startWatches, src) return nil } @@ -144,6 +240,21 @@ func (c *Controller[request]) NeedLeaderElection() bool { return *c.LeaderElected } +// Warmup implements the manager.WarmupRunnable interface. +func (c *Controller[request]) Warmup(ctx context.Context) error { + if c.EnableWarmup == nil || !*c.EnableWarmup { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Set the ctx so later calls to watch use this internal context + c.ctx = ctx + + return c.startEventSourcesAndQueueLocked(ctx) +} + // Start implements controller.Controller. func (c *Controller[request]) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -158,17 +269,6 @@ func (c *Controller[request]) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - queue := c.NewQueue(c.Name, c.RateLimiter) - if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { - c.Queue = priorityQueue - } else { - c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} - } - go func() { - <-ctx.Done() - c.Queue.ShutDown() - }() - wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() @@ -179,18 +279,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intended // caches. - if err := c.startEventSources(ctx); err != nil { + if err := c.startEventSourcesAndQueueLocked(ctx); err != nil { return err } c.LogConstructor(nil).Info("Starting Controller") - // All the watches have been started, we can reset the local slice. - // - // We should never hold watches more than necessary, each watch source can hold a backing cache, - // which won't be garbage collected if we hold a reference to it. - c.startWatches = nil - // Launch workers to process resources c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) wg.Add(c.MaxConcurrentReconciles) @@ -218,63 +312,90 @@ func (c *Controller[request]) Start(ctx context.Context) error { return nil } -// startEventSources launches all the sources registered with this controller and waits +// startEventSourcesAndQueueLocked launches all the sources registered with this controller and waits // for them to sync. It returns an error if any of the sources fail to start or sync. -func (c *Controller[request]) startEventSources(ctx context.Context) error { - errGroup := &errgroup.Group{} - for _, watch := range c.startWatches { - log := c.LogConstructor(nil) - _, ok := watch.(interface { - String() string - }) - - if !ok { - log = log.WithValues("source", fmt.Sprintf("%T", watch)) +func (c *Controller[request]) startEventSourcesAndQueueLocked(ctx context.Context) error { + var retErr error + + c.didStartEventSourcesOnce.Do(func() { + queue := c.NewQueue(c.Name, c.RateLimiter) + if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { + c.Queue = priorityQueue } else { - log = log.WithValues("source", fmt.Sprintf("%s", watch)) + c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} } - didStartSyncingSource := &atomic.Bool{} - errGroup.Go(func() error { - // Use a timeout for starting and syncing the source to avoid silently - // blocking startup indefinitely if it doesn't come up. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() - - sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out - go func() { - defer close(sourceStartErrChan) - log.Info("Starting EventSource") - if err := watch.Start(ctx, c.Queue); err != nil { - sourceStartErrChan <- err - return - } - syncingSource, ok := watch.(source.TypedSyncingSource[request]) - if !ok { - return - } - didStartSyncingSource.Store(true) - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) - log.Error(err, "Could not wait for Cache to sync") - sourceStartErrChan <- err + go func() { + <-ctx.Done() + c.Queue.ShutDown() + }() + + errGroup := &errgroup.Group{} + for _, watch := range c.startWatches { + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) + if !ok { + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) + } + didStartSyncingSource := &atomic.Bool{} + errGroup.Go(func() error { + // Use a timeout for starting and syncing the source to avoid silently + // blocking startup indefinitely if it doesn't come up. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out + go func() { + defer close(sourceStartErrChan) + log.Info("Starting EventSource") + + if err := watch.Start(ctx, c.Queue); err != nil { + sourceStartErrChan <- err + return + } + syncingSource, ok := watch.(source.TypedSyncingSource[request]) + if !ok { + return + } + didStartSyncingSource.Store(true) + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) + log.Error(err, "Could not wait for Cache to sync") + sourceStartErrChan <- err + } + }() + + select { + case err := <-sourceStartErrChan: + return err + case <-sourceStartCtx.Done(): + if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened + return <-sourceStartErrChan + } + if ctx.Err() != nil { // Don't return an error if the root context got cancelled + return nil + } + return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) } - }() + }) + } + retErr = errGroup.Wait() - select { - case err := <-sourceStartErrChan: - return err - case <-sourceStartCtx.Done(): - if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened - return <-sourceStartErrChan - } - if ctx.Err() != nil { // Don't return an error if the root context got cancelled - return nil - } - return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) - } - }) - } - return errGroup.Wait() + // All the watches have been started, we can reset the local slice. + // + // We should never hold watches more than necessary, each watch source can hold a backing cache, + // which won't be garbage collected if we hold a reference to it. + c.startWatches = nil + + // Mark event sources as started after resetting the startWatches slice so that watches from + // a new Watch() call are immediately started. + c.startedEventSourcesAndQueue = true + }) + + return retErr } // processNextWorkItem will read a single work item off the workqueue and @@ -343,7 +464,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -358,11 +479,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: ptr.To(priority)}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go b/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go index 5cc253917a3b..6c013e7992a8 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go @@ -56,6 +56,10 @@ type Options struct { // Without that, a single slow response from the API server can result // in losing leadership. RenewDeadline time.Duration + + // LeaderLabels are an optional set of labels that will be set on the lease object + // when this replica becomes leader + LeaderLabels map[string]string } // NewResourceLock creates a new resource lock for use in a leader election loop. @@ -63,7 +67,6 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op if !options.LeaderElection { return nil, nil } - // Default resource lock to "leases". The previous default (from v0.7.0 to v0.11.x) was configmapsleases, which was // used to migrate from configmaps to leases. Since the default was "configmapsleases" for over a year, spanning // five minor releases, any actively maintained operators are very likely to have a released version that uses @@ -93,22 +96,21 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op } id = id + "_" + string(uuid.NewUUID()) - // Construct clients for leader election - rest.AddUserAgent(config, "leader-election") + // Construct config for leader election + config = rest.AddUserAgent(config, "leader-election") + // Timeout set for a client used to contact to Kubernetes should be lower than + // RenewDeadline to keep a single hung request from forcing a leader loss. + // Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic. if options.RenewDeadline != 0 { - return resourcelock.NewFromKubeconfig(options.LeaderElectionResourceLock, - options.LeaderElectionNamespace, - options.LeaderElectionID, - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorderProvider.GetEventRecorderFor(id), - }, - config, - options.RenewDeadline, - ) + timeout := options.RenewDeadline / 2 + if timeout < time.Second { + timeout = time.Second + } + config.Timeout = timeout } + // Construct clients for leader election corev1Client, err := corev1client.NewForConfig(config) if err != nil { return nil, err @@ -118,7 +120,8 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op if err != nil { return nil, err } - return resourcelock.New(options.LeaderElectionResourceLock, + + return resourcelock.NewWithLabels(options.LeaderElectionResourceLock, options.LeaderElectionNamespace, options.LeaderElectionID, corev1Client, @@ -127,6 +130,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op Identity: id, EventRecorder: recorderProvider.GetEventRecorderFor(id), }, + options.LeaderLabels, ) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go index e5204a7506b9..a9f91cbdd56b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { return fmt.Errorf("failed to start other runnables: %w", err) } + // Start WarmupRunnables and wait for warmup to complete. + if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil { + return fmt.Errorf("failed to start warmup runnables: %w", err) + } + // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) @@ -534,6 +539,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e }() go func() { + go func() { + // Stop the warmup runnables in a separate goroutine to avoid blocking. + // It is important to stop the warmup runnables in parallel with the other runnables + // since we cannot assume ordering of whether or not one of the warmup runnables or one + // of the other runnables is holding a lock. + // Cancelling the wrong runnable (one that is not holding the lock) will cause the + // shutdown sequence to block indefinitely as it will wait for the runnable that is + // holding the lock to finish. + cm.logger.Info("Stopping and waiting for warmup runnables") + cm.runnables.Warmup.StopAndWait(cm.shutdownCtx) + }() + // First stop the non-leader election runnables. cm.logger.Info("Stopping and waiting for non leader election runnables") cm.runnables.Others.StopAndWait(cm.shutdownCtx) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go index c3ae317b048d..e0e94245e77c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go @@ -201,10 +201,15 @@ type Options struct { // LeaseDuration time first. LeaderElectionReleaseOnCancel bool + // LeaderElectionLabels allows a controller to supplement all leader election api calls with a set of custom labels based on + // the replica attempting to acquire leader status. + LeaderElectionLabels map[string]string + // LeaderElectionResourceLockInterface allows to provide a custom resourcelock.Interface that was created outside // of the controller-runtime. If this value is set the options LeaderElectionID, LeaderElectionNamespace, - // LeaderElectionResourceLock, LeaseDuration, RenewDeadline and RetryPeriod will be ignored. This can be useful if you - // want to use a locking mechanism that is currently not supported, like a MultiLock across two Kubernetes clusters. + // LeaderElectionResourceLock, LeaseDuration, RenewDeadline, RetryPeriod and LeaderElectionLeases will be ignored. + // This can be useful if you want to use a locking mechanism that is currently not supported, like a MultiLock across + // two Kubernetes clusters. LeaderElectionResourceLockInterface resourcelock.Interface // LeaseDuration is the duration that non-leader candidates will @@ -314,6 +319,15 @@ type LeaderElectionRunnable interface { NeedLeaderElection() bool } +// warmupRunnable knows if a Runnable requires warmup. A warmup runnable is a runnable +// that should be run when the manager is started but before it becomes leader. +// Note: Implementing this interface is only useful when LeaderElection can be enabled, as the +// behavior when leaderelection is not enabled is to run LeaderElectionRunnables immediately. +type warmupRunnable interface { + // Warmup will be called when the manager is started but before it becomes leader. + Warmup(context.Context) error +} + // New returns a new Manager for creating Controllers. // Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf" // will be used for all built-in resources of Kubernetes, and "application/json" is for other types @@ -390,6 +404,7 @@ func New(config *rest.Config, options Options) (Manager, error) { LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, RenewDeadline: *options.RenewDeadline, + LeaderLabels: options.LeaderElectionLabels, }) if err != nil { return nil, err @@ -417,7 +432,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } errChan := make(chan error, 1) - runnables := newRunnables(options.BaseContext, errChan) + runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger) return &controllerManager{ stopProcedureEngaged: ptr.To(int64(0)), cluster: cluster, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go index db5cda7c88f6..53e29fc56fad 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -32,6 +33,7 @@ type runnables struct { Webhooks *runnableGroup Caches *runnableGroup LeaderElection *runnableGroup + Warmup *runnableGroup Others *runnableGroup } @@ -42,10 +44,21 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { Webhooks: newRunnableGroup(baseContext, errChan), Caches: newRunnableGroup(baseContext, errChan), LeaderElection: newRunnableGroup(baseContext, errChan), + Warmup: newRunnableGroup(baseContext, errChan), Others: newRunnableGroup(baseContext, errChan), } } +// withLogger returns the runnables with the logger set for all runnable groups. +func (r *runnables) withLogger(logger logr.Logger) *runnables { + r.HTTPServers.withLogger(logger) + r.Webhooks.withLogger(logger) + r.Caches.withLogger(logger) + r.LeaderElection.withLogger(logger) + r.Others.withLogger(logger) + return r +} + // Add adds a runnable to closest group of runnable that they belong to. // // Add should be able to be called before and after Start, but not after StopAndWait. @@ -65,8 +78,20 @@ func (r *runnables) Add(fn Runnable) error { }) case webhook.Server: return r.Webhooks.Add(fn, nil) - case LeaderElectionRunnable: - if !runnable.NeedLeaderElection() { + case warmupRunnable, LeaderElectionRunnable: + if warmupRunnable, ok := fn.(warmupRunnable); ok { + if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil { + return err + } + } + + leaderElectionRunnable, ok := fn.(LeaderElectionRunnable) + if !ok { + // If the runnable is not a LeaderElectionRunnable, add it to the leader election group for backwards compatibility + return r.LeaderElection.Add(fn, nil) + } + + if !leaderElectionRunnable.NeedLeaderElection() { return r.Others.Add(fn, nil) } return r.LeaderElection.Add(fn, nil) @@ -105,6 +130,9 @@ type runnableGroup struct { // wg is an internal sync.WaitGroup that allows us to properly stop // and wait for all the runnables to finish before returning. wg *sync.WaitGroup + + // logger is used for logging when errors are dropped during shutdown + logger logr.Logger } func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup { @@ -113,12 +141,18 @@ func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnable errChan: errChan, ch: make(chan *readyRunnable), wg: new(sync.WaitGroup), + logger: logr.Discard(), // Default to no-op logger } r.ctx, r.cancel = context.WithCancel(baseContext()) return r } +// withLogger sets the logger for this runnable group. +func (r *runnableGroup) withLogger(logger logr.Logger) { + r.logger = logger +} + // Started returns true if the group has started. func (r *runnableGroup) Started() bool { r.start.Lock() @@ -224,7 +258,27 @@ func (r *runnableGroup) reconcile() { // Start the runnable. if err := rn.Start(r.ctx); err != nil { - r.errChan <- err + // Check if we're during the shutdown process. + r.stop.RLock() + isStopped := r.stopped + r.stop.RUnlock() + + if isStopped { + // During shutdown, try to send error first (error drain goroutine might still be running) + // but drop if it would block to prevent goroutine leaks + select { + case r.errChan <- err: + // Error sent successfully (error drain goroutine is still running) + default: + // Error drain goroutine has exited, drop error to prevent goroutine leak + if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown + r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err) + } + } + } else { + // During normal operation, always try to send errors (may block briefly) + r.errChan <- err + } } }(runnable) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/server.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/server.go index 76f6165b5323..1983165da873 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/server.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/server.go @@ -70,7 +70,7 @@ func (s *Server) Start(ctx context.Context) error { shutdownCtx := context.Background() if s.ShutdownTimeout != nil { var shutdownCancel context.CancelFunc - shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout) + shutdownCtx, shutdownCancel = context.WithTimeout(shutdownCtx, *s.ShutdownTimeout) defer shutdownCancel() } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go b/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go index ce33975f3b5b..9f24cb178cb3 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go @@ -47,13 +47,15 @@ type TypedPredicate[object any] interface { Generic(event.TypedGenericEvent[object]) bool } -var _ Predicate = Funcs{} -var _ Predicate = ResourceVersionChangedPredicate{} -var _ Predicate = GenerationChangedPredicate{} -var _ Predicate = AnnotationChangedPredicate{} -var _ Predicate = or[client.Object]{} -var _ Predicate = and[client.Object]{} -var _ Predicate = not[client.Object]{} +var ( + _ Predicate = Funcs{} + _ Predicate = ResourceVersionChangedPredicate{} + _ Predicate = GenerationChangedPredicate{} + _ Predicate = AnnotationChangedPredicate{} + _ Predicate = or[client.Object]{} + _ Predicate = and[client.Object]{} + _ Predicate = not[client.Object]{} +) // Funcs is a function that implements Predicate. type Funcs = TypedFuncs[client.Object] @@ -259,11 +261,10 @@ func (TypedAnnotationChangedPredicate[object]) Update(e event.TypedUpdateEvent[o // This predicate will skip update events that have no change in the object's label. // It is intended to be used in conjunction with the GenerationChangedPredicate, as in the following example: // -// Controller.Watch( -// -// &source.Kind{Type: v1.MyCustomKind}, -// &handler.EnqueueRequestForObject{}, -// predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})) +// Controller.Watch( +// &source.Kind{Type: v1.MyCustomKind}, +// &handler.EnqueueRequestForObject{}, +// predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})) // // This will be helpful when object's labels is carrying some extra specification information beyond object's spec, // and the controller will be triggered if any valid spec change (not only in spec, but also in labels) happens. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/conversion.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/conversion.go index 249a364b3819..a26fa348bb70 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/conversion.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/conversion.go @@ -22,7 +22,9 @@ See pkg/conversion for interface definitions required to ensure an API Type is c package conversion import ( + "context" "encoding/json" + "errors" "fmt" "net/http" @@ -31,8 +33,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "sigs.k8s.io/controller-runtime/pkg/conversion" logf "sigs.k8s.io/controller-runtime/pkg/log" + conversionmetrics "sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics" ) var ( @@ -53,6 +57,8 @@ type webhook struct { var _ http.Handler = &webhook{} func (wh *webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + convertReview := &apix.ConversionReview{} err := json.NewDecoder(r.Body).Decode(convertReview) if err != nil { @@ -69,7 +75,7 @@ func (wh *webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO(droot): may be move the conversion logic to a separate module to // decouple it from the http layer ? - resp, err := wh.handleConvertRequest(convertReview.Request) + resp, err := wh.handleConvertRequest(ctx, convertReview.Request) if err != nil { log.Error(err, "failed to convert", "request", convertReview.Request.UID) convertReview.Response = errored(err) @@ -87,7 +93,18 @@ func (wh *webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // handles a version conversion request. -func (wh *webhook) handleConvertRequest(req *apix.ConversionRequest) (*apix.ConversionResponse, error) { +func (wh *webhook) handleConvertRequest(ctx context.Context, req *apix.ConversionRequest) (_ *apix.ConversionResponse, retErr error) { + defer func() { + if r := recover(); r != nil { + conversionmetrics.WebhookPanics.WithLabelValues().Inc() + + for _, fn := range utilruntime.PanicHandlers { + fn(ctx, r) + } + retErr = errors.New("internal error occurred during conversion") + return + } + }() if req == nil { return nil, fmt.Errorf("conversion request is nil") } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics/metrics.go new file mode 100644 index 000000000000..c825f17f0b9e --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/conversion/metrics/metrics.go @@ -0,0 +1,39 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +var ( + // WebhookPanics is a prometheus counter metrics which holds the total + // number of panics from conversion webhooks. + WebhookPanics = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "controller_runtime_conversion_webhook_panics_total", + Help: "Total number of conversion webhook panics", + }, []string{}) +) + +func init() { + metrics.Registry.MustRegister( + WebhookPanics, + ) + // Init metric. + WebhookPanics.WithLabelValues().Add(0) +} From d89c54494338d3c1d4ce7a58cfd4f3b7b236d69a Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Mon, 17 Nov 2025 12:19:16 +0800 Subject: [PATCH 2/2] Adopt controller-runtime change PR3229: fake client cleared apiversion/kind Signed-off-by: RainbowMango --- pkg/controllers/execution/execution_controller.go | 10 ++++++++++ pkg/controllers/namespace/namespace_sync_controller.go | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 28515c11c892..5c8344b3716a 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -358,6 +358,16 @@ func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context, return err } + if work.GetObjectKind().GroupVersionKind().Empty() { + // In unit tests, we need to verify how many events are emitted. However, when using the fake client, + // the work object may not have TypeMeta (APIVersion/Kind) set, which prevents building a proper + // object reference and thus no event is actually emitted. This workaround ensures the GVK is set + // so event emission, and thereby event counting in unit tests, works as expected. + // Since controller-runtime v0.22.0, the group version kind is not set when using fake client. + // See https://github.com/kubernetes-sigs/controller-runtime/pull/3229 for more details. + work.SetGroupVersionKind(workv1alpha1.SchemeGroupVersion.WithKind("Work")) + } + obj, err := helper.ToUnstructured(work) if err != nil { return err diff --git a/pkg/controllers/namespace/namespace_sync_controller.go b/pkg/controllers/namespace/namespace_sync_controller.go index 51915b318d30..f89962603551 100644 --- a/pkg/controllers/namespace/namespace_sync_controller.go +++ b/pkg/controllers/namespace/namespace_sync_controller.go @@ -82,6 +82,14 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{}, err } + if namespace.GetObjectKind().GroupVersionKind().Empty() { + // In unit tests, we need to verify that a work object is created for the corresponding namespace. + // However, the fake client does not set TypeMeta information, which results in an incorrect generated work object name. + // This ensures correct GVK is set so work name generation in tests works as expected. + // Since controller-runtime v0.22.0, the group version kind is not set when using fake client. + // See https://github.com/kubernetes-sigs/controller-runtime/pull/3229 for more details. + namespace.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Namespace")) + } if !namespace.DeletionTimestamp.IsZero() { // Do nothing, just return as we have added owner reference to Work.