Skip to content

⚠️ Migration to the new events API #3262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
28 changes: 19 additions & 9 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,30 @@ package cluster
import (
"context"
"errors"
"fmt"
"net/http"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
recorder.Provider

// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
GetHTTPClient() *http.Client

Expand All @@ -58,9 +64,6 @@ type Cluster interface {
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer

// GetEventRecorderFor returns a new EventRecorder for the provided name
GetEventRecorderFor(name string) record.EventRecorder

// GetRESTMapper returns a RESTMapper
GetRESTMapper() meta.RESTMapper

Expand Down Expand Up @@ -160,8 +163,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
}
options, err := setOptionsDefaults(options, config)
if err != nil {
options.Logger.Error(err, "Failed to set defaults")
return nil, err
return nil, fmt.Errorf("failed setting cluster default options: %w", err)
}

// Create the mapper provider
Expand Down Expand Up @@ -281,16 +283,24 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
options.newRecorderProvider = intrec.NewProvider
}

// This is duplicated with pkg/manager, we need it here to provide
// the user with an EventBroadcaster and there for the Leader election
evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient)
if err != nil {
return options, err
}

// This is duplicated with pkg/manager, we need it here to provide
// the user with an EventBroadcaster and there for the Leader election
if options.EventBroadcaster == nil {
// defer initialization to avoid leaking by default
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
return record.NewBroadcaster(), true
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
}
} else {
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
return options.EventBroadcaster, false
// keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one.
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false
}
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() {
c, err := New(nil)
Expect(c).To(BeNil())
Expect(err.Error()).To(ContainSubstring("must specify Config"))

})

It("should return an error if it can't create a RestMapper", func() {
Expand All @@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() {
})
Expect(c).To(BeNil())
Expect(err).To(Equal(expected))

})

It("should return an error it can't create a client.Client", func() {
Expand Down Expand Up @@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("expected error"))
})

})

Describe("Start", func() {
Expand Down Expand Up @@ -160,7 +157,13 @@ var _ = Describe("cluster.Cluster", func() {
It("should provide a function to get the EventRecorder", func() {
c, err := New(cfg)
Expect(err).NotTo(HaveOccurred())
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil())
Expect(c.GetEventRecorder("test")).NotTo(BeNil())
})

It("should provide a function to get the deprecated EventRecorder", func() {
c, err := New(cfg)
Expect(err).NotTo(HaveOccurred())
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck
})
It("should provide a function to get the APIReader", func() {
c, err := New(cfg)
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"

"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder {
return c.recorderProvider.GetEventRecorderFor(name)
}

func (c *cluster) GetEventRecorder(name string) events.EventRecorder {
return c.recorderProvider.GetEventRecorder(name)
}

func (c *cluster) GetRESTMapper() meta.RESTMapper {
return c.mapper
}
Expand Down
101 changes: 83 additions & 18 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import (

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
)

// EventBroadcasterProducer makes an event broadcaster, returning
// whether or not the broadcaster should be stopped with the Provider,
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
// This producer currently produces both an old API and a new API broadcaster.
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool)

// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
Expand All @@ -48,9 +51,12 @@ type Provider struct {
evtClient corev1client.EventInterface
makeBroadcaster EventBroadcasterProducer

broadcasterOnce sync.Once
broadcaster record.EventBroadcaster
stopBroadcaster bool
broadcasterOnce sync.Once
broadcaster events.EventBroadcaster
cancelSinkRecordingFunc context.CancelFunc
// Deprecated: will be removed in a future release. Use the broadcaster above instead.
deprecatedBroadcaster record.EventBroadcaster
stopBroadcaster bool
}

// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
Expand All @@ -71,10 +77,12 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
// almost certainly already been started (e.g. by leader election). We
// need to invoke this to ensure that we don't inadvertently race with
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
deprecatedBroadcaster, broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
p.lock.Lock()
broadcaster.Shutdown()
p.cancelSinkRecordingFunc()
deprecatedBroadcaster.Shutdown()
p.stopped = true
p.lock.Unlock()
}
Expand All @@ -89,25 +97,42 @@ func (p *Provider) Stop(shutdownCtx context.Context) {

// getBroadcaster ensures that a broadcaster is started for this
// provider, and returns it. It's threadsafe.
func (p *Provider) getBroadcaster() record.EventBroadcaster {
func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) {
// NB(directxman12): this can technically still leak if something calls
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
// create the broadcaster in start, we could race with other things that
// are started at the same time & want to emit events. The alternative is
// silently swallowing events and more locking, but that seems suboptimal.

p.broadcasterOnce.Do(func() {
broadcaster, stop := p.makeBroadcaster()
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
broadcaster.StartEventWatcher(
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster()

// init deprecated broadcaster
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
p.deprecatedBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
})
p.broadcaster = broadcaster
p.stopBroadcaster = stop

// init new broadcaster
ctx, cancel := context.WithCancel(context.Background())
p.cancelSinkRecordingFunc = cancel
if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
p.logger.Error(err, "error starting recording for broadcaster")
}

_, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) {
e, isEvt := event.(*eventsv1.Event)
if isEvt {
p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason)
}
})
if err != nil {
p.logger.Error(err, "error starting event watcher for broadcaster")
}
})

return p.broadcaster
return p.deprecatedBroadcaster, p.broadcaster
}

// NewProvider create a new Provider instance.
Expand All @@ -128,6 +153,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
return &deprecatedRecorder{
prov: p,
name: name,
}
}

// GetEventRecorder returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorder(name string) events.EventRecorder {
return &lazyRecorder{
prov: p,
name: name,
Expand All @@ -141,18 +175,47 @@ type lazyRecorder struct {
name string

recOnce sync.Once
rec record.EventRecorder
rec events.EventRecorder
}

// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *lazyRecorder) ensureRecording() {
l.recOnce.Do(func() {
broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
_, broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name)
})
}

func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...)
}
l.prov.lock.RUnlock()
}

// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release.
//
// Deprecated: will be removed in a future release.
type deprecatedRecorder struct {
prov *Provider
name string

recOnce sync.Once
rec record.EventRecorder
}

// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *deprecatedRecorder) ensureRecording() {
l.recOnce.Do(func() {
deprecatedBroadcaster, _ := l.prov.getBroadcaster()
l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
})
}

func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()

l.prov.lock.RLock()
Expand All @@ -161,7 +224,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {

func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
Expand All @@ -170,7 +234,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {

func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
Expand Down
Loading