diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0603f4cde5..ee14638c3f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -19,13 +19,16 @@ package cluster import ( "context" "errors" + "fmt" "net/http" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -33,10 +36,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/recorder" ) // Cluster provides various methods to interact with a cluster. type Cluster interface { + recorder.Provider + // GetHTTPClient returns an HTTP client that can be used to talk to the apiserver GetHTTPClient() *http.Client @@ -58,9 +64,6 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder - // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper @@ -160,8 +163,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { } options, err := setOptionsDefaults(options, config) if err != nil { - options.Logger.Error(err, "Failed to set defaults") - return nil, err + return nil, fmt.Errorf("failed setting cluster default options: %w", err) } // Create the mapper provider @@ -281,16 +283,24 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { options.newRecorderProvider = intrec.NewProvider } + // This is duplicated with pkg/manager, we need it here to provide + // the user with an EventBroadcaster and there for the Leader election + evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient) + if err != nil { + return options, err + } + // This is duplicated with pkg/manager, we need it here to provide // the user with an EventBroadcaster and there for the Leader election if options.EventBroadcaster == nil { // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true } } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false } } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index c08a742403..c275ff0bf5 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() { c, err := New(nil) Expect(c).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() { }) Expect(c).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) }) - }) Describe("Start", func() { @@ -160,7 +157,13 @@ var _ = Describe("cluster.Cluster", func() { It("should provide a function to get the EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) - Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(c.GetEventRecorder("test")).NotTo(BeNil()) + }) + + It("should provide a function to get the deprecated EventRecorder", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) It("should provide a function to get the APIReader", func() { c, err := New(cfg) diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index 2742764231..755f83b546 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder { return c.recorderProvider.GetEventRecorderFor(name) } +func (c *cluster) GetEventRecorder(name string) events.EventRecorder { + return c.recorderProvider.GetEventRecorder(name) +} + func (c *cluster) GetRESTMapper() meta.RESTMapper { return c.mapper } diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 21f0146ba3..6b82ad6d36 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -24,16 +24,19 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" ) // EventBroadcasterProducer makes an event broadcaster, returning // whether or not the broadcaster should be stopped with the Provider, // or not (e.g. if it's shared, it shouldn't be stopped with the Provider). -type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool) +// This producer currently produces both an old API and a new API broadcaster. +type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool) // Provider is a recorder.Provider that records events to the k8s API server // and to a logr Logger. @@ -48,9 +51,12 @@ type Provider struct { evtClient corev1client.EventInterface makeBroadcaster EventBroadcasterProducer - broadcasterOnce sync.Once - broadcaster record.EventBroadcaster - stopBroadcaster bool + broadcasterOnce sync.Once + broadcaster events.EventBroadcaster + cancelSinkRecordingFunc context.CancelFunc + // Deprecated: will be removed in a future release. Use the broadcaster above instead. + deprecatedBroadcaster record.EventBroadcaster + stopBroadcaster bool } // NB(directxman12): this manually implements Stop instead of Being a runnable because we need to @@ -71,10 +77,12 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // almost certainly already been started (e.g. by leader election). We // need to invoke this to ensure that we don't inadvertently race with // an invocation of getBroadcaster. - broadcaster := p.getBroadcaster() + deprecatedBroadcaster, broadcaster := p.getBroadcaster() if p.stopBroadcaster { p.lock.Lock() broadcaster.Shutdown() + p.cancelSinkRecordingFunc() + deprecatedBroadcaster.Shutdown() p.stopped = true p.lock.Unlock() } @@ -89,7 +97,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // getBroadcaster ensures that a broadcaster is started for this // provider, and returns it. It's threadsafe. -func (p *Provider) getBroadcaster() record.EventBroadcaster { +func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) { // NB(directxman12): this can technically still leak if something calls // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we // create the broadcaster in start, we could race with other things that @@ -97,17 +105,34 @@ func (p *Provider) getBroadcaster() record.EventBroadcaster { // silently swallowing events and more locking, but that seems suboptimal. p.broadcasterOnce.Do(func() { - broadcaster, stop := p.makeBroadcaster() - broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) - broadcaster.StartEventWatcher( + p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() + + // init deprecated broadcaster + p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) + p.deprecatedBroadcaster.StartEventWatcher( func(e *corev1.Event) { p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) }) - p.broadcaster = broadcaster - p.stopBroadcaster = stop + + // init new broadcaster + ctx, cancel := context.WithCancel(context.Background()) + p.cancelSinkRecordingFunc = cancel + if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { + p.logger.Error(err, "error starting recording for broadcaster") + } + + _, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) { + e, isEvt := event.(*eventsv1.Event) + if isEvt { + p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason) + } + }) + if err != nil { + p.logger.Error(err, "error starting event watcher for broadcaster") + } }) - return p.broadcaster + return p.deprecatedBroadcaster, p.broadcaster } // NewProvider create a new Provider instance. @@ -128,6 +153,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S // GetEventRecorderFor returns an event recorder that broadcasts to this provider's // broadcaster. All events will be associated with a component of the given name. func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { + return &deprecatedRecorder{ + prov: p, + name: name, + } +} + +// GetEventRecorder returns an event recorder that broadcasts to this provider's +// broadcaster. All events will be associated with a component of the given name. +func (p *Provider) GetEventRecorder(name string) events.EventRecorder { return &lazyRecorder{ prov: p, name: name, @@ -141,18 +175,47 @@ type lazyRecorder struct { name string recOnce sync.Once - rec record.EventRecorder + rec events.EventRecorder } // ensureRecording ensures that a concrete recorder is populated for this recorder. func (l *lazyRecorder) ensureRecording() { l.recOnce.Do(func() { - broadcaster := l.prov.getBroadcaster() - l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) + _, broadcaster := l.prov.getBroadcaster() + l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name) + }) +} + +func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) { + l.ensureRecording() + + l.prov.lock.RLock() + if !l.prov.stopped { + l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...) + } + l.prov.lock.RUnlock() +} + +// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release. +// +// Deprecated: will be removed in a future release. +type deprecatedRecorder struct { + prov *Provider + name string + + recOnce sync.Once + rec record.EventRecorder +} + +// ensureRecording ensures that a concrete recorder is populated for this recorder. +func (l *deprecatedRecorder) ensureRecording() { + l.recOnce.Do(func() { + deprecatedBroadcaster, _ := l.prov.getBroadcaster() + l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) }) } -func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) { +func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) { l.ensureRecording() l.prov.lock.RLock() @@ -161,7 +224,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s } l.prov.lock.RUnlock() } -func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + +func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { l.ensureRecording() l.prov.lock.RLock() @@ -170,7 +234,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF } l.prov.lock.RUnlock() } -func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + +func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { l.ensureRecording() l.prov.lock.RLock() diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index c278fbde79..90879df889 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -21,11 +21,14 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" ref "k8s.io/client-go/tools/reference" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -36,14 +39,14 @@ import ( ) var _ = Describe("recorder", func() { - Describe("recorder", func() { + Describe("deprecated recorder", func() { It("should publish events", func(ctx SpecContext) { By("Creating the Manager") cm, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorderFor("test-recorder") + recorder := cm.GetEventRecorderFor("test-deprecated-recorder") //nolint:staticcheck instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -66,7 +69,7 @@ var _ = Describe("recorder", func() { }() deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, + ObjectMeta: metav1.ObjectMeta{Name: "deprecated-deployment-name"}, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, @@ -108,4 +111,88 @@ var _ = Describe("recorder", func() { Expect(evt.Message).To(Equal("test-msg")) }) }) + + Describe("recorder", func() { + It("should publish events", func(ctx SpecContext) { + By("Creating the Manager") + // this test needs its own env for now to not interfere with the previous one. + // Once the deprecated API is removed this can be removed. + testenv := &envtest.Environment{} + + cfg, err := testenv.Start() + Expect(err).NotTo(HaveOccurred()) + defer testenv.Stop() //nolint:errcheck + + clientset, err := kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + + cm, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + recorder := cm.GetEventRecorder("test-recorder") + instance, err := controller.New("bar-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + dp, err := clientset.AppsV1().Deployments(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + recorder.Eventf(dp, nil, corev1.EventTypeNormal, "test-reason", "test-action", "test-msg") + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching Resources") + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{})) + Expect(err).NotTo(HaveOccurred()) + + By("Starting the Manager") + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + + By("Invoking Reconciling") + deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Validate event is published as expected") + evtWatcher, err := clientset.EventsV1().Events("default").Watch(ctx, metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + resultEvent := <-evtWatcher.ResultChan() + + Expect(resultEvent.Type).To(Equal(watch.Added)) + evt, isEvent := resultEvent.Object.(*eventsv1.Event) + Expect(isEvent).To(BeTrue()) + + dpRef, err := ref.GetReference(scheme.Scheme, deployment) + Expect(err).NotTo(HaveOccurred()) + + Expect(evt.Regarding).To(Equal(*dpRef)) + Expect(evt.Type).To(Equal(corev1.EventTypeNormal)) + Expect(evt.Reason).To(Equal("test-reason")) + Expect(evt.Note).To(Equal("test-msg")) + }) + }) }) diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index e226e165a3..e592a1e189 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -21,15 +21,16 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) var _ = Describe("recorder.Provider", func() { - makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true } Describe("NewProvider", func() { It("should return a provider instance and a nil error.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(provider).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) }) @@ -38,18 +39,36 @@ var _ = Describe("recorder.Provider", func() { // Invalid the config cfg1 := *cfg cfg1.Host = "invalid host" - _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to init client")) }) }) + Describe("GetEventRecorderFor", func() { + It("should return a deprecated recorder instance.", func() { + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) + Expect(err).NotTo(HaveOccurred()) + + recorder := provider.GetEventRecorderFor("test") + Expect(recorder).NotTo(BeNil()) + }) + }) Describe("GetEventRecorder", func() { It("should return a recorder instance.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(err).NotTo(HaveOccurred()) - recorder := provider.GetEventRecorderFor("test") + recorder := provider.GetEventRecorder("test") Expect(recorder).NotTo(BeNil()) }) }) }) + +func makeBroadcaster() func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient) + Expect(err).NotTo(HaveOccurred()) + + return func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } +} diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 6c013e7992..09534df3f0 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -128,7 +128,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetEventRecorderFor(id), + EventRecorder: recorderProvider.GetEventRecorderFor(id), //nolint:staticcheck }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a9f91cbdd5..859a75b947 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -256,7 +257,11 @@ func (cm *controllerManager) GetCache() cache.Cache { } func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.cluster.GetEventRecorderFor(name) + return cm.cluster.GetEventRecorderFor(name) //nolint:staticcheck +} + +func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { + return cm.cluster.GetEventRecorder(name) } func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e0e94245e7..74983ddcea 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,7 +29,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" @@ -337,7 +339,10 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, errors.New("must specify Config") } // Set default values for options fields - options = setOptionsDefaults(options) + options, err := setOptionsDefaults(config, options) + if err != nil { + return nil, fmt.Errorf("failed setting manager default options: %w", err) + } cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme @@ -493,7 +498,7 @@ func defaultBaseContext() context.Context { } // setOptionsDefaults set default values for Options fields. -func setOptionsDefaults(options Options) Options { +func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -507,14 +512,25 @@ func setOptionsDefaults(options Options) Options { // This is duplicated with pkg/cluster, we need it here // for the leader election and there to provide the user with // an EventBroadcaster + httpClient, err := rest.HTTPClientFor(config) + if err != nil { + return options, err + } + + evtCl, err := eventsv1client.NewForConfigAndClient(config, httpClient) + if err != nil { + return options, err + } + if options.EventBroadcaster == nil { // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true } } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false } } @@ -571,5 +587,5 @@ func setOptionsDefaults(options Options) Options { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - return options + return options, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 4363d62f59..f14fa65a09 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -60,7 +60,6 @@ var _ = Describe("manger.Manager", func() { m, err := New(nil, Options{}) Expect(m).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -70,7 +69,6 @@ var _ = Describe("manger.Manager", func() { }) Expect(m).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -207,7 +205,6 @@ var _ = Describe("manger.Manager", func() { } // Don't leak routines <-mgrDone - }) It("should disable gracefulShutdown when stopping to lead", func(ctx SpecContext) { m, err := New(cfg, Options{ @@ -443,7 +440,6 @@ var _ = Describe("manger.Manager", func() { Expect(ok).To(BeTrue()) _, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock) Expect(isLeaseLock).To(BeTrue()) - }) It("should use the specified ResourceLock", func() { m, err := New(cfg, Options{ @@ -671,7 +667,7 @@ var _ = Describe("manger.Manager", func() { }) Describe("Start", func() { - var startSuite = func(options Options, callbacks ...func(Manager)) { + startSuite := func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(ctx SpecContext) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) @@ -1256,7 +1252,6 @@ var _ = Describe("manger.Manager", func() { <-managerStopDone Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) }) - } Context("with defaults", func() { @@ -1790,7 +1785,6 @@ var _ = Describe("manger.Manager", func() { err = m.Start(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("manager already started")) - }) }) @@ -1810,7 +1804,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) - It("should not leak goroutines if the default event broadcaster is used & events are emitted", func(specCtx SpecContext) { + It("should not leak goroutines if the deprecated event broadcaster is used & events are emitted", func(specCtx SpecContext) { currentGRs := goleak.IgnoreCurrent() m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) @@ -1820,7 +1814,7 @@ var _ = Describe("manger.Manager", func() { ns := corev1.Namespace{} ns.Name = "default" - recorder := m.GetEventRecorderFor("rock-and-roll") + recorder := m.GetEventRecorderFor("rock-and-roll") //nolint:staticcheck Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil @@ -1858,6 +1852,54 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should not leak goroutines if the default event broadcaster is used & events are emitted", func(specCtx SpecContext) { + currentGRs := goleak.IgnoreCurrent() + + m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) + Expect(err).NotTo(HaveOccurred()) + + By("adding a runnable that emits an event") + ns := corev1.Namespace{} + ns.Name = "default" + + recorder := m.GetEventRecorder("rock-and-roll") + Expect(m.Add(RunnableFunc(func(_ context.Context) error { + recorder.Eventf(&ns, nil, "Warning", "BallroomBlitz", "dance action", "yeah, yeah, yeah-yeah-yeah") + return nil + }))).To(Succeed()) + + By("starting the manager & waiting till we've sent our event") + ctx, cancel := context.WithCancel(specCtx) + doneCh := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(doneCh) + Expect(m.Start(ctx)).To(Succeed()) + }() + <-m.Elected() + + Eventually(func() *corev1.Event { + evts, err := clientset.CoreV1().Events("").SearchWithContext(ctx, m.GetScheme(), &ns) + Expect(err).NotTo(HaveOccurred()) + + for i, evt := range evts.Items { + if evt.Reason == "BallroomBlitz" { + return &evts.Items[i] + } + } + return nil + }).ShouldNot(BeNil()) + + By("making sure there's no extra go routines still running after we stop") + cancel() + <-doneCh + + // force-close keep-alive connections. These'll time anyway (after + // like 30s or so) but force it to speed up the tests. + clientTransport.CloseIdleConnections() + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + }) + It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func(specCtx SpecContext) { // This test reproduces the race condition where the manager's Start method // exits due to context cancellation, leaving no one to drain errChan @@ -1938,10 +1980,15 @@ var _ = Describe("manger.Manager", func() { Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer())) }) + It("should provide a function to get the deprecated EventRecorder", func() { + m, err := New(cfg, Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck + }) It("should provide a function to get the EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(m.GetEventRecorder("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) @@ -2020,8 +2067,7 @@ var _ = Describe("manger.Manager", func() { }) }) -type runnableError struct { -} +type runnableError struct{} func (runnableError) Error() string { return "not feeling like that" diff --git a/pkg/recorder/example_test.go b/pkg/recorder/example_test.go index 969420d817..47f14ff715 100644 --- a/pkg/recorder/example_test.go +++ b/pkg/recorder/example_test.go @@ -18,31 +18,39 @@ package recorder_test import ( corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" _ "github.com/onsi/ginkgo/v2" "sigs.k8s.io/controller-runtime/pkg/recorder" ) var ( - recorderProvider recorder.Provider - somePod *corev1.Pod // the object you're reconciling, for example + recorderProvider recorder.Provider + somePod *corev1.Pod // the object you're reconciling, for example + someRelatedObject runtime.Object // another object related to the reconciled object and the event. ) func Example_event() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + deprecatedRecorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a fixed message - recorder.Event(somePod, corev1.EventTypeWarning, + deprecatedRecorder.Event(somePod, corev1.EventTypeWarning, "WrongTrousers", "It's the wrong trousers, Gromit!") } func Example_eventf() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + deprecatedRecorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a variable message mildCheese := "Wensleydale" - recorder.Eventf(somePod, corev1.EventTypeNormal, + deprecatedRecorder.Eventf(somePod, corev1.EventTypeNormal, "DislikesCheese", "Not even %s?", mildCheese) + + recorder := recorderProvider.GetEventRecorder("my-controller") + + // emit an event with a fixed message + recorder.Eventf(somePod, someRelatedObject, corev1.EventTypeWarning, + "WrongTrousers", "getting dressed", "It's the wrong trousers, Gromit!") } diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index f093f0a726..b34fecb525 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -21,11 +21,16 @@ limitations under the License. package recorder import ( + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" ) // Provider knows how to generate new event recorders with given name. type Provider interface { - // NewRecorder returns an EventRecorder with given name. + // GetEventRecorderFor returns an EventRecorder for the old events API. + // + // Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead. GetEventRecorderFor(name string) record.EventRecorder + // GetEventRecorder returns a EventRecorder with given name. + GetEventRecorder(name string) events.EventRecorder }